Skip to content

Improve composer 2 to composer 3 migration script with various optimizations#13840

Open
danieldeleo wants to merge 7 commits intoGoogleCloudPlatform:mainfrom
danieldeleo:main
Open

Improve composer 2 to composer 3 migration script with various optimizations#13840
danieldeleo wants to merge 7 commits intoGoogleCloudPlatform:mainfrom
danieldeleo:main

Conversation

@danieldeleo
Copy link
Contributor

Description

The current migration script runs python subprocesses to call gcloud commands which is inefficient and unnecessary given that the Airflow REST API can be called via the Airflow web server URL

Note: Before submitting a pull request, please open an issue for discussion if you are not associated with Google.

Checklist

@danieldeleo danieldeleo requested review from a team as code owners February 19, 2026 02:12
@product-auto-label product-auto-label bot added samples Issues that are directly related to samples. api: composer Issues related to the Cloud Composer API. labels Feb 19, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @danieldeleo, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Composer migration script by transitioning from inefficient subprocess calls to direct API interactions. The change aims to improve the script's performance and reliability by leveraging native Python libraries for Google Cloud authentication and HTTP requests, and by introducing optimized bulk operations for managing Airflow DAGs during the migration process.

Highlights

  • API-driven Composer Interaction: The ComposerClient class was refactored to directly interact with the Google Cloud Composer API and Airflow REST API using google.auth and requests libraries, replacing previous gcloud subprocess calls and curl commands. This change significantly improves efficiency and reduces overhead.
  • Bulk DAG Operations: New methods pause_all_dags and unpause_all_dags were introduced to allow for bulk pausing and unpausing of DAGs in a Composer environment, leveraging the Airflow REST API's dag_id_pattern functionality.
  • Asynchronous Operation Handling: A private helper method _wait_for_operation was added to poll and manage long-running asynchronous operations returned by the Composer API, ensuring proper completion and error handling for tasks like environment creation and snapshot management.
  • Migration Script Optimization: The main migration logic was updated to utilize the new bulk DAG operations, streamlining the pausing of all DAGs in the source environment and optimizing the unpausing process in the target environment based on the source DAGs' initial paused state.
Changelog
  • composer/tools/composer_migrate.py
    • Removed subprocess import and added time, google.auth, google.auth.transport.requests.AuthorizedSession, and requests imports.
    • Refactored ComposerClient to initialize google.auth credentials and an AuthorizedSession for direct API calls.
    • Updated get_environment, create_environment_from_config, list_dags, pause_dag, unpause_dag, save_snapshot, and load_snapshot methods to use direct HTTP requests instead of gcloud commands.
    • Added new methods pause_all_dags and unpause_all_dags for bulk DAG management.
    • Implemented a private helper method _wait_for_operation to handle asynchronous API operations.
    • Removed the run_shell_command utility function as it is no longer needed.
    • Modified the main function to use pause_all_dags for the source environment.
    • Improved the DAG presence check in the target environment with explicit logging for missing DAGs and added time.sleep for polling.
    • Optimized the unpausing logic in the target environment to use unpause_all_dags if all source DAGs were unpaused, otherwise iterating individually.
Activity
  • The pull request introduces a significant refactoring of the Composer migration script, moving from subprocess-based gcloud commands to direct API calls using Python's requests and google.auth libraries.
  • New functionality for bulk pausing and unpausing of DAGs has been added to improve efficiency during the migration process.
  • A mechanism for waiting on long-running asynchronous operations from the Composer API has been implemented.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request is a significant improvement, refactoring the Composer migration script to use direct API calls instead of inefficient gcloud subprocesses. This enhances performance and simplifies the codebase. My review highlights a few potential issues, including incorrect wildcard usage in API calls, the handling of the protected airflow_monitoring DAG, and opportunities for further performance optimization and code consistency.

environment = self.get_environment(environment_name)
airflow_uri = environment["config"]["airflowUri"]

url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The Airflow REST API's dag_id_pattern parameter expects a glob expression. The % character is not a standard glob wildcard; * should be used to match all DAGs. Using % will likely result in no DAGs being matched, causing this function to fail silently.

Suggested change
url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard
url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=*" # Pause all DAGs using * as a wildcard

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry gemini, this suggestion is just incorrect https://github.com/apache/airflow/blob/4404bc05b3e77bf1c50219ba2ec1da5ef560a684/airflow-core/src/airflow/api_fastapi/common/parameters.py#L253 Airflow clearly states regular expressions are not supported and that you should use % and _ wildcards

environment = self.get_environment(environment_name)
airflow_uri = environment["config"]["airflowUri"]

url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to pause_all_dags, the dag_id_pattern parameter for the Airflow REST API expects a glob expression. Please use * instead of % to correctly match all DAGs.

Suggested change
url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard
url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=*" # Unpause all DAGs using * as a wildcard

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again this suggestion is incorrect https://github.com/apache/airflow/blob/4404bc05b3e77bf1c50219ba2ec1da5ef560a684/airflow-core/src/airflow/api_fastapi/common/parameters.py#L253 Airflow clearly states regular expressions are not supported and that you should use % and _ wildcards

logger.info("Pausing DAG %s in the source environment.", dag["dag_id"])
client.pause_dag(dag["dag_id"], source_environment_name)
logger.info("DAG %s paused.", dag["dag_id"])
client.pause_all_dags(source_environment_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The previous implementation explicitly skipped the airflow_monitoring DAG when pausing DAGs. The new bulk operation will attempt to pause all DAGs, including this protected system DAG. This could lead to errors if the API call fails for this specific DAG and aborts the entire batch operation. Please ensure the bulk operation handles this case gracefully. This concern also applies to the unpausing logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

airflow_monitoring is safe to pause because Composer automatically unpauses it shortly afterwards

ddeleo and others added 2 commits February 18, 2026 22:26
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: composer Issues related to the Cloud Composer API. samples Issues that are directly related to samples.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments