diff --git a/google/cloud/aiplatform/pipeline_job_schedules.py b/google/cloud/aiplatform/pipeline_job_schedules.py index 1b68b4cb93..e01173bbbb 100644 --- a/google/cloud/aiplatform/pipeline_job_schedules.py +++ b/google/cloud/aiplatform/pipeline_job_schedules.py @@ -129,6 +129,7 @@ def create( allow_queueing: bool = False, max_run_count: Optional[int] = None, max_concurrent_run_count: int = 1, + max_concurrent_active_run_count: Optional[int] = None, service_account: Optional[str] = None, network: Optional[str] = None, create_request_timeout: Optional[float] = None, @@ -155,6 +156,10 @@ def create( Must be positive and <= 2^63-1. max_concurrent_run_count (int): Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule. + max_concurrent_active_run_count (int): + Optional. Maximum number of active runs that can be executed + concurrently for this PipelineJobSchedule. Active runs are those + in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED). service_account (str): Optional. Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. @@ -176,6 +181,7 @@ def create( allow_queueing=allow_queueing, max_run_count=max_run_count, max_concurrent_run_count=max_concurrent_run_count, + max_concurrent_active_run_count=max_concurrent_active_run_count, service_account=service_account, network=network, create_request_timeout=create_request_timeout, @@ -189,6 +195,7 @@ def _create( allow_queueing: bool = False, max_run_count: Optional[int] = None, max_concurrent_run_count: int = 1, + max_concurrent_active_run_count: Optional[int] = None, service_account: Optional[str] = None, network: Optional[str] = None, create_request_timeout: Optional[float] = None, @@ -215,6 +222,10 @@ def _create( Must be positive and <= 2^63-1. max_concurrent_run_count (int): Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule. + max_concurrent_active_run_count (int): + Optional. Maximum number of active runs that can be executed + concurrently for this PipelineJobSchedule. Active runs are those + in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED). service_account (str): Optional. Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. @@ -239,6 +250,10 @@ def _create( self._gca_resource.max_run_count = max_run_count if max_concurrent_run_count: self._gca_resource.max_concurrent_run_count = max_concurrent_run_count + if max_concurrent_active_run_count: + self._gca_resource.max_concurrent_active_run_count = ( + max_concurrent_active_run_count + ) service_account = service_account or initializer.global_config.service_account network = network or initializer.global_config.network @@ -383,6 +398,7 @@ def update( allow_queueing: Optional[bool] = None, max_run_count: Optional[int] = None, max_concurrent_run_count: Optional[int] = None, + max_concurrent_active_run_count: Optional[int] = None, ) -> None: """Update an existing PipelineJobSchedule. @@ -415,6 +431,10 @@ def update( Must be positive and <= 2^63-1. max_concurrent_run_count (int): Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule. + max_concurrent_active_run_count (int): + Optional. Maximum number of active runs that can be executed + concurrently for this PipelineJobSchedule. Active runs are those + in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED). Raises: RuntimeError: User tried to call update() before create(). @@ -451,6 +471,13 @@ def update( "max_concurrent_run_count", max_concurrent_run_count, ) + if max_concurrent_active_run_count is not None: + updated_fields.append("max_concurrent_active_run_count") + setattr( + pipeline_job_schedule, + "max_concurrent_active_run_count", + max_concurrent_active_run_count, + ) update_mask = field_mask.FieldMask(paths=updated_fields) self.api_client.update_schedule( diff --git a/google/cloud/aiplatform/preview/pipelinejobschedule/pipeline_job_schedules.py b/google/cloud/aiplatform/preview/pipelinejobschedule/pipeline_job_schedules.py index 2c119c4922..02d0f97434 100644 --- a/google/cloud/aiplatform/preview/pipelinejobschedule/pipeline_job_schedules.py +++ b/google/cloud/aiplatform/preview/pipelinejobschedule/pipeline_job_schedules.py @@ -75,6 +75,7 @@ def create( allow_queueing: bool = False, max_run_count: Optional[int] = None, max_concurrent_run_count: int = 1, + max_concurrent_active_run_count: Optional[int] = None, service_account: Optional[str] = None, network: Optional[str] = None, create_request_timeout: Optional[float] = None, @@ -101,6 +102,10 @@ def create( Must be positive and <= 2^63-1. max_concurrent_run_count (int): Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule. + max_concurrent_active_run_count (int): + Optional. Maximum number of active runs that can be executed + concurrently for this PipelineJobSchedule. Active runs are those + in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED). service_account (str): Optional. Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. @@ -120,6 +125,7 @@ def create( allow_queueing=allow_queueing, max_run_count=max_run_count, max_concurrent_run_count=max_concurrent_run_count, + max_concurrent_active_run_count=max_concurrent_active_run_count, service_account=service_account, network=network, create_request_timeout=create_request_timeout, @@ -190,6 +196,7 @@ def update( allow_queueing: Optional[bool] = None, max_run_count: Optional[int] = None, max_concurrent_run_count: Optional[int] = None, + max_concurrent_active_run_count: Optional[int] = None, ) -> None: """Update an existing PipelineJobSchedule. @@ -222,6 +229,10 @@ def update( Must be positive and <= 2^63-1. max_concurrent_run_count (int): Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule. + max_concurrent_active_run_count (int): + Optional. Maximum number of active runs that can be executed + concurrently for this PipelineJobSchedule. Active runs are those + in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED). Raises: RuntimeError: User tried to call update() before create(). @@ -234,4 +245,5 @@ def update( allow_queueing=allow_queueing, max_run_count=max_run_count, max_concurrent_run_count=max_concurrent_run_count, + max_concurrent_active_run_count=max_concurrent_active_run_count, ) diff --git a/google/cloud/aiplatform/schedules.py b/google/cloud/aiplatform/schedules.py index 2083a60dbc..6ea72d4ce7 100644 --- a/google/cloud/aiplatform/schedules.py +++ b/google/cloud/aiplatform/schedules.py @@ -186,6 +186,16 @@ def max_concurrent_run_count(self) -> int: self._sync_gca_resource() return self._gca_resource.max_concurrent_run_count + @property + def max_concurrent_active_run_count(self) -> int: + """Current Schedule max_concurrent_active_run_count. + + Returns: + Schedule max_concurrent_active_run_count. + """ + self._sync_gca_resource() + return self._gca_resource.max_concurrent_active_run_count + @property def allow_queueing(self) -> bool: """Whether current Schedule allows queueing. diff --git a/google/cloud/aiplatform_v1/types/schedule.py b/google/cloud/aiplatform_v1/types/schedule.py index cdb61246d4..2d2b7d2e0c 100644 --- a/google/cloud/aiplatform_v1/types/schedule.py +++ b/google/cloud/aiplatform_v1/types/schedule.py @@ -119,6 +119,10 @@ class Schedule(proto.Message): the limit for starting the scheduled requests and not the execution of the operations/jobs created by the requests (if applicable). + max_concurrent_active_run_count (int): + Optional. Maximum number of active runs that can be executed + concurrently for this PipelineJobSchedule. Active runs are those + in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED). allow_queueing (bool): Optional. Whether new scheduled runs can be queued when max_concurrent_runs limit is reached. If set to true, new @@ -265,6 +269,10 @@ class RunResponse(proto.Message): proto.INT64, number=11, ) + max_concurrent_active_run_count: int = proto.Field( + proto.INT64, + number=21, + ) allow_queueing: bool = proto.Field( proto.BOOL, number=12, diff --git a/google/cloud/aiplatform_v1beta1/types/schedule.py b/google/cloud/aiplatform_v1beta1/types/schedule.py index 1f8c2a41cc..59d1f1e216 100644 --- a/google/cloud/aiplatform_v1beta1/types/schedule.py +++ b/google/cloud/aiplatform_v1beta1/types/schedule.py @@ -125,6 +125,10 @@ class Schedule(proto.Message): the limit for starting the scheduled requests and not the execution of the operations/jobs created by the requests (if applicable). + max_concurrent_active_run_count (int): + Optional. Maximum number of active runs that can be executed + concurrently for this PipelineJobSchedule. Active runs are those + in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED). allow_queueing (bool): Optional. Whether new scheduled runs can be queued when max_concurrent_runs limit is reached. If set to true, new @@ -279,6 +283,10 @@ class RunResponse(proto.Message): proto.INT64, number=11, ) + max_concurrent_active_run_count: int = proto.Field( + proto.INT64, + number=21, + ) allow_queueing: bool = proto.Field( proto.BOOL, number=12, diff --git a/tests/unit/aiplatform/test_pipeline_job_schedules.py b/tests/unit/aiplatform/test_pipeline_job_schedules.py index c27b7b6086..b042716fbb 100644 --- a/tests/unit/aiplatform/test_pipeline_job_schedules.py +++ b/tests/unit/aiplatform/test_pipeline_job_schedules.py @@ -72,9 +72,11 @@ _TEST_PIPELINE_JOB_SCHEDULE_CRON = "* * * * *" _TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT = 1 _TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT = 2 +_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT = 10 _TEST_UPDATED_PIPELINE_JOB_SCHEDULE_CRON = "1 1 1 1 1" _TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT = 5 +_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT = 3 _TEST_TEMPLATE_PATH = f"gs://{_TEST_GCS_BUCKET_NAME}/job_spec.json" _TEST_AR_TEMPLATE_PATH = "https://us-central1-kfp.pkg.dev/proj/repo/pack/latest" @@ -472,6 +474,7 @@ def test_call_preview_schedule_service_create( cron_expression=_TEST_PIPELINE_JOB_SCHEDULE_CRON, max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT, max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT, + max_concurrent_active_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT, service_account=_TEST_SERVICE_ACCOUNT, network=_TEST_NETWORK, create_request_timeout=None, @@ -494,6 +497,7 @@ def test_call_preview_schedule_service_create( cron=_TEST_PIPELINE_JOB_SCHEDULE_CRON, max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT, max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT, + max_concurrent_active_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT, create_pipeline_job_request={ "parent": _TEST_PARENT, "pipeline_job": { @@ -555,6 +559,7 @@ def test_call_schedule_service_create( cron=_TEST_PIPELINE_JOB_SCHEDULE_CRON, max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT, max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT, + max_concurrent_active_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT, service_account=_TEST_SERVICE_ACCOUNT, network=_TEST_NETWORK, create_request_timeout=None, @@ -577,6 +582,7 @@ def test_call_schedule_service_create( cron=_TEST_PIPELINE_JOB_SCHEDULE_CRON, max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT, max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT, + max_concurrent_active_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT, create_pipeline_job_request={ "parent": _TEST_PARENT, "pipeline_job": { @@ -1947,6 +1953,7 @@ def test_call_schedule_service_update( pipeline_job_schedule.update( cron=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_CRON, max_run_count=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT, + max_concurrent_active_run_count=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT, ) expected_gapic_pipeline_job_schedule = gca_schedule.Schedule( @@ -1956,6 +1963,7 @@ def test_call_schedule_service_update( cron=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_CRON, max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT, max_run_count=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT, + max_concurrent_active_run_count=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT, create_pipeline_job_request=_TEST_CREATE_PIPELINE_JOB_REQUEST, ) assert ( @@ -2268,3 +2276,56 @@ def test_get_allow_queueing_before_create( ) pipeline_job_schedule.allow_queueing + + @pytest.mark.parametrize( + "job_spec", + [_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB], + ) + def test_get_max_concurrent_active_run_count_before_create( + self, + mock_schedule_service_create, + mock_schedule_service_get, + mock_schedule_bucket_exists, + job_spec, + mock_load_yaml_and_json, + ): + """Gets the PipelineJobSchedule max_concurrent_active_run_count before creating. + + Raises error because PipelineJobSchedule should be created first. + """ + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_GCS_BUCKET_NAME, + location=_TEST_LOCATION, + credentials=_TEST_CREDENTIALS, + ) + + job = pipeline_jobs.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + template_path=_TEST_TEMPLATE_PATH, + parameter_values=_TEST_PIPELINE_PARAMETER_VALUES, + input_artifacts=_TEST_PIPELINE_INPUT_ARTIFACTS, + enable_caching=True, + ) + + pipeline_job_schedule = pipeline_job_schedules.PipelineJobSchedule( + pipeline_job=job, + display_name=_TEST_PIPELINE_JOB_SCHEDULE_DISPLAY_NAME, + ) + + with pytest.raises(RuntimeError) as e: + pipeline_job_schedule.max_concurrent_active_run_count + + assert e.match(regexp=r"PipelineJobSchedule resource has not been created.") + + pipeline_job_schedule.create( + cron=_TEST_PIPELINE_JOB_SCHEDULE_CRON, + max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT, + max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT, + max_concurrent_active_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT, + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + create_request_timeout=None, + ) + + pipeline_job_schedule.max_concurrent_active_run_count