Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions openevolve/process_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ async def run_evolution(

# Early stopping tracking
early_stopping_enabled = self.config.early_stopping_patience is not None
termination_reason = None
if early_stopping_enabled:
best_score = float("-inf")
iterations_without_improvement = 0
Expand Down Expand Up @@ -686,6 +687,7 @@ async def run_evolution(
logger.info(
f"Target score {target_score} reached at iteration {completed_iteration}"
)
termination_reason = "target_score"
break

# Check early stopping
Expand Down Expand Up @@ -731,6 +733,7 @@ async def run_evolution(
f"No improvement for {iterations_without_improvement} iterations "
f"(best score: {best_score:.4f})"
)
termination_reason = "early_stopping"
break

else:
Expand All @@ -742,6 +745,7 @@ async def run_evolution(
f"Task successfully solved with score {best_score:.4f}."
)
self.early_stopping_triggered = True
termination_reason = "early_stopping"
break

except FutureTimeoutError:
Expand Down Expand Up @@ -777,17 +781,40 @@ async def run_evolution(
next_iteration += 1
break # Only submit one iteration per completion to maintain balance

# Handle shutdown
if self.shutdown_event.is_set():
logger.info("Shutdown requested, canceling remaining evaluations...")
# Handle shutdown / early termination by canceling pending work quickly.
if self.shutdown_event.is_set() and termination_reason is None:
termination_reason = "shutdown"

if termination_reason in {"target_score", "early_stopping", "shutdown"}:
if termination_reason == "shutdown":
logger.info("Shutdown requested, canceling remaining evaluations...")
else:
logger.info(
f"{termination_reason} triggered, canceling remaining pending evaluations..."
)
for future in pending_futures.values():
future.cancel()

# Determine completion reason if no explicit early termination was set.
if termination_reason is None:
if completed_iterations >= max_iterations:
termination_reason = "max_iterations"
elif not pending_futures:
termination_reason = "no_pending_futures"
elif self.shutdown_event.is_set():
termination_reason = "shutdown"
else:
termination_reason = "max_iterations"

# Log completion reason
if self.early_stopping_triggered:
if termination_reason == "early_stopping":
logger.info("✅ Evolution completed - Early stopping triggered due to convergence")
elif self.shutdown_event.is_set():
elif termination_reason == "target_score":
logger.info("✅ Evolution completed - Target score reached")
elif termination_reason == "shutdown":
logger.info("✅ Evolution completed - Shutdown requested")
elif termination_reason == "no_pending_futures":
logger.info("✅ Evolution completed - No pending futures remaining")
else:
logger.info("✅ Evolution completed - Maximum iterations reached")

Expand Down
41 changes: 41 additions & 0 deletions tests/test_process_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,47 @@ def test_request_shutdown(self):
# Verify shutdown event is set
self.assertTrue(controller.shutdown_event.is_set())

def test_target_score_fast_exit_cancels_pending_futures(self):
"""Test that reaching target score cancels pending futures immediately."""

async def run_test():
controller = ProcessParallelController(self.config, self.eval_file, self.database)
controller.executor = Mock() # run_evolution requires a started executor

done_future = MagicMock(spec=Future)
done_future.done.return_value = True
done_future.result.return_value = SerializableResult(
child_program_dict={
"id": "child_target_hit",
"code": "def evolved(): return 99",
"language": "python",
"parent_id": "test_0",
"generation": 1,
"metrics": {"combined_score": 0.95, "score": 0.95},
"iteration_found": 1,
"metadata": {"changes": "target score hit", "island": 0},
},
parent_id="test_0",
iteration_time=0.1,
iteration=1,
target_island=0,
)

pending_future = MagicMock(spec=Future)
pending_future.done.return_value = False
pending_future.cancel.return_value = True

with patch.object(
controller, "_submit_iteration", side_effect=[done_future, pending_future]
) as mock_submit:
await controller.run_evolution(start_iteration=1, max_iterations=2, target_score=0.9)

self.assertEqual(mock_submit.call_count, 2)
self.assertTrue(pending_future.cancel.called)
self.assertIn("child_target_hit", self.database.programs)

asyncio.run(run_test())

def test_serializable_result(self):
"""Test SerializableResult dataclass"""
result = SerializableResult(
Expand Down