Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions openevolve/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ class DatabaseConfig:

novelty_llm: Optional["LLMInterface"] = None
embedding_model: Optional[str] = None
embedding_base_url: Optional[str] = None
similarity_threshold: float = 0.99


Expand Down
2 changes: 1 addition & 1 deletion openevolve/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def __init__(self, config: DatabaseConfig):

self.novelty_llm = config.novelty_llm
self.embedding_client = (
EmbeddingClient(config.embedding_model) if config.embedding_model else None
EmbeddingClient(config.embedding_model, base_url=config.embedding_base_url) if config.embedding_model else None
)
self.similarity_threshold = config.similarity_threshold

Expand Down
39 changes: 14 additions & 25 deletions openevolve/embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,51 +10,40 @@

logger = logging.getLogger(__name__)

M = 1_000_000

OPENAI_EMBEDDING_MODELS = [
"text-embedding-3-small",
"text-embedding-3-large",
]

AZURE_EMBEDDING_MODELS = [
"azure-text-embedding-3-small",
"azure-text-embedding-3-large",
]

OPENAI_EMBEDDING_COSTS = {
"text-embedding-3-small": 0.02 / M,
"text-embedding-3-large": 0.13 / M,
}


class EmbeddingClient:
def __init__(self, model_name: str = "text-embedding-3-small"):
def __init__(self, model_name: str = "text-embedding-3-small", base_url: str | None = None):
"""
Initialize the EmbeddingClient.

Args:
model (str): The OpenAI embedding model name to use.
model_name: The embedding model name to use.
base_url: Optional base URL for the embedding API endpoint.
"""
self.client, self.model = self._get_client_model(model_name)
self.client, self.model = self._get_client_model(model_name, base_url)

def _get_client_model(self, model_name: str) -> tuple[openai.OpenAI, str]:
if model_name in OPENAI_EMBEDDING_MODELS:
# Use OPENAI_EMBEDDING_API_KEY if set, otherwise fall back to OPENAI_API_KEY
# This allows users to use OpenRouter for LLMs while using OpenAI for embeddings
embedding_api_key = os.getenv("OPENAI_EMBEDDING_API_KEY") or os.getenv("OPENAI_API_KEY")
client = openai.OpenAI(api_key=embedding_api_key)
model_to_use = model_name
elif model_name in AZURE_EMBEDDING_MODELS:
def _get_client_model(
self, model_name: str, base_url: str | None = None
) -> tuple[openai.OpenAI, str]:
if model_name in AZURE_EMBEDDING_MODELS:
# get rid of the azure- prefix
model_to_use = model_name.split("azure-")[-1]
client = openai.AzureOpenAI(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version=os.getenv("AZURE_API_VERSION"),
azure_endpoint=os.getenv("AZURE_API_ENDPOINT"),
azure_endpoint=os.environ["AZURE_API_ENDPOINT"],
)
else:
raise ValueError(f"Invalid embedding model: {model_name}")
# Use OPENAI_EMBEDDING_API_KEY if set, otherwise fall back to OPENAI_API_KEY
# This allows users to use OpenRouter for LLMs while using OpenAI for embeddings
embedding_api_key = os.getenv("OPENAI_EMBEDDING_API_KEY") or os.getenv("OPENAI_API_KEY")
client = openai.OpenAI(api_key=embedding_api_key, base_url=base_url)
model_to_use = model_name

return client, model_to_use

Expand Down
86 changes: 81 additions & 5 deletions openevolve/process_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@
import asyncio
import logging
import multiprocessing as mp
import pickle
import signal
import time
from concurrent.futures import Future, ProcessPoolExecutor
from concurrent.futures import BrokenExecutor, Future, ProcessPoolExecutor
from concurrent.futures import TimeoutError as FutureTimeoutError
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional

from openevolve.config import Config
from openevolve.database import Program, ProgramDatabase
Expand Down Expand Up @@ -357,6 +354,10 @@ def __init__(
self.num_workers = config.evaluator.parallel_evaluations
self.num_islands = config.database.num_islands

# Recovery tracking for process pool crashes
self.recovery_attempts = 0
self.max_recovery_attempts = 3

logger.info(f"Initialized process parallel controller with {self.num_workers} workers")

def _serialize_config(self, config: Config) -> dict:
Expand Down Expand Up @@ -434,6 +435,38 @@ def stop(self) -> None:

logger.info("Stopped process pool")

def _recover_process_pool(self, failed_iterations: list[int] | None = None) -> None:
"""Recover from a crashed process pool by recreating it.

Args:
failed_iterations: List of iteration numbers that failed and need re-queuing
"""
import gc

logger.warning("Process pool crashed, attempting recovery...")

# Shutdown broken executor without waiting (it's already broken)
if self.executor:
try:
self.executor.shutdown(wait=False, cancel_futures=True)
except Exception:
pass # Executor may already be in bad state
self.executor = None

# Force garbage collection to free memory before restarting
gc.collect()

# Brief delay to let system stabilize (memory freed, processes cleaned up)
time.sleep(2.0)

# Recreate the pool
self.start()

if failed_iterations:
logger.info(f"Pool recovered. {len(failed_iterations)} iterations will be re-queued.")
else:
logger.info("Pool recovered successfully.")

def request_shutdown(self) -> None:
"""Request graceful shutdown"""
logger.info("Graceful shutdown requested...")
Expand Down Expand Up @@ -559,6 +592,14 @@ async def run_evolution(
# Reconstruct program from dict
child_program = Program(**result.child_program_dict)

# Reset recovery counter on successful iteration
if self.recovery_attempts > 0:
logger.info(
f"Pool stable after recovery, resetting recovery counter "
f"(was {self.recovery_attempts})"
)
self.recovery_attempts = 0

# Add to database with explicit target_island to ensure proper island placement
# This fixes issue #391: children should go to the target island, not inherit
# from the parent (which may be from a different island due to fallback sampling)
Expand Down Expand Up @@ -752,6 +793,38 @@ async def run_evolution(
)
# Cancel the future to clean up the process
future.cancel()
except BrokenExecutor as e:
logger.error(f"Process pool crashed during iteration {completed_iteration}: {e}")

# Collect all failed iterations from pending futures
failed_iterations = [completed_iteration] + list(pending_futures.keys())

# Clear pending futures (they're all invalid now)
pending_futures.clear()
for island_id in island_pending:
island_pending[island_id].clear()

# Attempt recovery
self.recovery_attempts += 1
if self.recovery_attempts > self.max_recovery_attempts:
logger.error(
f"Max recovery attempts ({self.max_recovery_attempts}) exceeded. "
f"Stopping evolution."
)
break

self._recover_process_pool(failed_iterations)

# Re-queue failed iterations (distribute across islands)
for i, failed_iter in enumerate(failed_iterations):
if failed_iter < total_iterations:
island_id = i % self.num_islands
future = self._submit_iteration(failed_iter, island_id)
if future:
pending_futures[failed_iter] = future
island_pending[island_id].append(failed_iter)

continue
except Exception as e:
logger.error(f"Error processing result from iteration {completed_iteration}: {e}")

Expand Down Expand Up @@ -822,6 +895,9 @@ def _submit_iteration(

return future

except BrokenExecutor:
# Let this propagate up to run_evolution for recovery
raise
except Exception as e:
logger.error(f"Error submitting iteration {iteration}: {e}")
return None
Loading