Improve composer 2 to composer 3 migration script with various optimizations#13840
Improve composer 2 to composer 3 migration script with various optimizations#13840danieldeleo wants to merge 7 commits intoGoogleCloudPlatform:mainfrom
Conversation
… API calls using Google Auth and Requests.
… optimize `restore_dags_state` to use bulk unpause when possible.
… a 10-second delay after listing target DAGs.
…in the target environment.
Summary of ChangesHello @danieldeleo, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Composer migration script by transitioning from inefficient subprocess calls to direct API interactions. The change aims to improve the script's performance and reliability by leveraging native Python libraries for Google Cloud authentication and HTTP requests, and by introducing optimized bulk operations for managing Airflow DAGs during the migration process. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request is a significant improvement, refactoring the Composer migration script to use direct API calls instead of inefficient gcloud subprocesses. This enhances performance and simplifies the codebase. My review highlights a few potential issues, including incorrect wildcard usage in API calls, the handling of the protected airflow_monitoring DAG, and opportunities for further performance optimization and code consistency.
| environment = self.get_environment(environment_name) | ||
| airflow_uri = environment["config"]["airflowUri"] | ||
|
|
||
| url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard |
There was a problem hiding this comment.
The Airflow REST API's dag_id_pattern parameter expects a glob expression. The % character is not a standard glob wildcard; * should be used to match all DAGs. Using % will likely result in no DAGs being matched, causing this function to fail silently.
| url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard | |
| url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=*" # Pause all DAGs using * as a wildcard |
There was a problem hiding this comment.
Sorry gemini, this suggestion is just incorrect https://github.com/apache/airflow/blob/4404bc05b3e77bf1c50219ba2ec1da5ef560a684/airflow-core/src/airflow/api_fastapi/common/parameters.py#L253 Airflow clearly states regular expressions are not supported and that you should use % and _ wildcards
| environment = self.get_environment(environment_name) | ||
| airflow_uri = environment["config"]["airflowUri"] | ||
|
|
||
| url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard |
There was a problem hiding this comment.
Similar to pause_all_dags, the dag_id_pattern parameter for the Airflow REST API expects a glob expression. Please use * instead of % to correctly match all DAGs.
| url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard | |
| url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=*" # Unpause all DAGs using * as a wildcard |
There was a problem hiding this comment.
Again this suggestion is incorrect https://github.com/apache/airflow/blob/4404bc05b3e77bf1c50219ba2ec1da5ef560a684/airflow-core/src/airflow/api_fastapi/common/parameters.py#L253 Airflow clearly states regular expressions are not supported and that you should use % and _ wildcards
| logger.info("Pausing DAG %s in the source environment.", dag["dag_id"]) | ||
| client.pause_dag(dag["dag_id"], source_environment_name) | ||
| logger.info("DAG %s paused.", dag["dag_id"]) | ||
| client.pause_all_dags(source_environment_name) |
There was a problem hiding this comment.
The previous implementation explicitly skipped the airflow_monitoring DAG when pausing DAGs. The new bulk operation will attempt to pause all DAGs, including this protected system DAG. This could lead to errors if the API call fails for this specific DAG and aborts the entire batch operation. Please ensure the bulk operation handles this case gracefully. This concern also applies to the unpausing logic.
There was a problem hiding this comment.
airflow_monitoring is safe to pause because Composer automatically unpauses it shortly afterwards
…ce redundant API calls.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Description
The current migration script runs python subprocesses to call gcloud commands which is inefficient and unnecessary given that the Airflow REST API can be called via the Airflow web server URL
Note: Before submitting a pull request, please open an issue for discussion if you are not associated with Google.
Checklist
nox -s py-3.9(see Test Environment Setup)nox -s lint(see Test Environment Setup)