Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions google/cloud/aiplatform/pipeline_job_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def create(
allow_queueing: bool = False,
max_run_count: Optional[int] = None,
max_concurrent_run_count: int = 1,
max_concurrent_active_run_count: Optional[int] = None,
service_account: Optional[str] = None,
network: Optional[str] = None,
create_request_timeout: Optional[float] = None,
Expand All @@ -155,6 +156,10 @@ def create(
Must be positive and <= 2^63-1.
max_concurrent_run_count (int):
Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule.
max_concurrent_active_run_count (int):
Optional. Maximum number of active runs that can be executed
concurrently for this PipelineJobSchedule. Active runs are those
in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED).
service_account (str):
Optional. Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
Expand All @@ -176,6 +181,7 @@ def create(
allow_queueing=allow_queueing,
max_run_count=max_run_count,
max_concurrent_run_count=max_concurrent_run_count,
max_concurrent_active_run_count=max_concurrent_active_run_count,
service_account=service_account,
network=network,
create_request_timeout=create_request_timeout,
Expand All @@ -189,6 +195,7 @@ def _create(
allow_queueing: bool = False,
max_run_count: Optional[int] = None,
max_concurrent_run_count: int = 1,
max_concurrent_active_run_count: Optional[int] = None,
service_account: Optional[str] = None,
network: Optional[str] = None,
create_request_timeout: Optional[float] = None,
Expand All @@ -215,6 +222,10 @@ def _create(
Must be positive and <= 2^63-1.
max_concurrent_run_count (int):
Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule.
max_concurrent_active_run_count (int):
Optional. Maximum number of active runs that can be executed
concurrently for this PipelineJobSchedule. Active runs are those
in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED).
service_account (str):
Optional. Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
Expand All @@ -239,6 +250,10 @@ def _create(
self._gca_resource.max_run_count = max_run_count
if max_concurrent_run_count:
self._gca_resource.max_concurrent_run_count = max_concurrent_run_count
if max_concurrent_active_run_count:
self._gca_resource.max_concurrent_active_run_count = (
max_concurrent_active_run_count
)

service_account = service_account or initializer.global_config.service_account
network = network or initializer.global_config.network
Expand Down Expand Up @@ -383,6 +398,7 @@ def update(
allow_queueing: Optional[bool] = None,
max_run_count: Optional[int] = None,
max_concurrent_run_count: Optional[int] = None,
max_concurrent_active_run_count: Optional[int] = None,
) -> None:
"""Update an existing PipelineJobSchedule.

Expand Down Expand Up @@ -415,6 +431,10 @@ def update(
Must be positive and <= 2^63-1.
max_concurrent_run_count (int):
Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule.
max_concurrent_active_run_count (int):
Optional. Maximum number of active runs that can be executed
concurrently for this PipelineJobSchedule. Active runs are those
in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED).

Raises:
RuntimeError: User tried to call update() before create().
Expand Down Expand Up @@ -451,6 +471,13 @@ def update(
"max_concurrent_run_count",
max_concurrent_run_count,
)
if max_concurrent_active_run_count is not None:
updated_fields.append("max_concurrent_active_run_count")
setattr(
pipeline_job_schedule,
"max_concurrent_active_run_count",
max_concurrent_active_run_count,
)

update_mask = field_mask.FieldMask(paths=updated_fields)
self.api_client.update_schedule(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def create(
allow_queueing: bool = False,
max_run_count: Optional[int] = None,
max_concurrent_run_count: int = 1,
max_concurrent_active_run_count: Optional[int] = None,
service_account: Optional[str] = None,
network: Optional[str] = None,
create_request_timeout: Optional[float] = None,
Expand All @@ -101,6 +102,10 @@ def create(
Must be positive and <= 2^63-1.
max_concurrent_run_count (int):
Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule.
max_concurrent_active_run_count (int):
Optional. Maximum number of active runs that can be executed
concurrently for this PipelineJobSchedule. Active runs are those
in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED).
service_account (str):
Optional. Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
Expand All @@ -120,6 +125,7 @@ def create(
allow_queueing=allow_queueing,
max_run_count=max_run_count,
max_concurrent_run_count=max_concurrent_run_count,
max_concurrent_active_run_count=max_concurrent_active_run_count,
service_account=service_account,
network=network,
create_request_timeout=create_request_timeout,
Expand Down Expand Up @@ -190,6 +196,7 @@ def update(
allow_queueing: Optional[bool] = None,
max_run_count: Optional[int] = None,
max_concurrent_run_count: Optional[int] = None,
max_concurrent_active_run_count: Optional[int] = None,
) -> None:
"""Update an existing PipelineJobSchedule.

Expand Down Expand Up @@ -222,6 +229,10 @@ def update(
Must be positive and <= 2^63-1.
max_concurrent_run_count (int):
Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule.
max_concurrent_active_run_count (int):
Optional. Maximum number of active runs that can be executed
concurrently for this PipelineJobSchedule. Active runs are those
in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED).

Raises:
RuntimeError: User tried to call update() before create().
Expand All @@ -234,4 +245,5 @@ def update(
allow_queueing=allow_queueing,
max_run_count=max_run_count,
max_concurrent_run_count=max_concurrent_run_count,
max_concurrent_active_run_count=max_concurrent_active_run_count,
)
10 changes: 10 additions & 0 deletions google/cloud/aiplatform/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ def max_concurrent_run_count(self) -> int:
self._sync_gca_resource()
return self._gca_resource.max_concurrent_run_count

@property
def max_concurrent_active_run_count(self) -> int:
"""Current Schedule max_concurrent_active_run_count.

Returns:
Schedule max_concurrent_active_run_count.
"""
self._sync_gca_resource()
return self._gca_resource.max_concurrent_active_run_count

@property
def allow_queueing(self) -> bool:
"""Whether current Schedule allows queueing.
Expand Down
8 changes: 8 additions & 0 deletions google/cloud/aiplatform_v1/types/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ class Schedule(proto.Message):
the limit for starting the scheduled requests
and not the execution of the operations/jobs
created by the requests (if applicable).
max_concurrent_active_run_count (int):
Optional. Maximum number of active runs that can be executed
concurrently for this PipelineJobSchedule. Active runs are those
in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED).
allow_queueing (bool):
Optional. Whether new scheduled runs can be queued when
max_concurrent_runs limit is reached. If set to true, new
Expand Down Expand Up @@ -265,6 +269,10 @@ class RunResponse(proto.Message):
proto.INT64,
number=11,
)
max_concurrent_active_run_count: int = proto.Field(
proto.INT64,
number=21,
)
allow_queueing: bool = proto.Field(
proto.BOOL,
number=12,
Expand Down
8 changes: 8 additions & 0 deletions google/cloud/aiplatform_v1beta1/types/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ class Schedule(proto.Message):
the limit for starting the scheduled requests
and not the execution of the operations/jobs
created by the requests (if applicable).
max_concurrent_active_run_count (int):
Optional. Maximum number of active runs that can be executed
concurrently for this PipelineJobSchedule. Active runs are those
in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED).
allow_queueing (bool):
Optional. Whether new scheduled runs can be queued when
max_concurrent_runs limit is reached. If set to true, new
Expand Down Expand Up @@ -279,6 +283,10 @@ class RunResponse(proto.Message):
proto.INT64,
number=11,
)
max_concurrent_active_run_count: int = proto.Field(
proto.INT64,
number=21,
)
allow_queueing: bool = proto.Field(
proto.BOOL,
number=12,
Expand Down
61 changes: 61 additions & 0 deletions tests/unit/aiplatform/test_pipeline_job_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@
_TEST_PIPELINE_JOB_SCHEDULE_CRON = "* * * * *"
_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT = 1
_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT = 2
_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT = 10

_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_CRON = "1 1 1 1 1"
_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT = 5
_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT = 3

_TEST_TEMPLATE_PATH = f"gs://{_TEST_GCS_BUCKET_NAME}/job_spec.json"
_TEST_AR_TEMPLATE_PATH = "https://us-central1-kfp.pkg.dev/proj/repo/pack/latest"
Expand Down Expand Up @@ -472,6 +474,7 @@ def test_call_preview_schedule_service_create(
cron_expression=_TEST_PIPELINE_JOB_SCHEDULE_CRON,
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
max_concurrent_active_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
create_request_timeout=None,
Expand All @@ -494,6 +497,7 @@ def test_call_preview_schedule_service_create(
cron=_TEST_PIPELINE_JOB_SCHEDULE_CRON,
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
max_concurrent_active_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT,
create_pipeline_job_request={
"parent": _TEST_PARENT,
"pipeline_job": {
Expand Down Expand Up @@ -555,6 +559,7 @@ def test_call_schedule_service_create(
cron=_TEST_PIPELINE_JOB_SCHEDULE_CRON,
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
max_concurrent_active_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
create_request_timeout=None,
Expand All @@ -577,6 +582,7 @@ def test_call_schedule_service_create(
cron=_TEST_PIPELINE_JOB_SCHEDULE_CRON,
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
max_concurrent_active_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT,
create_pipeline_job_request={
"parent": _TEST_PARENT,
"pipeline_job": {
Expand Down Expand Up @@ -1947,6 +1953,7 @@ def test_call_schedule_service_update(
pipeline_job_schedule.update(
cron=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_CRON,
max_run_count=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
max_concurrent_active_run_count=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT,
)

expected_gapic_pipeline_job_schedule = gca_schedule.Schedule(
Expand All @@ -1956,6 +1963,7 @@ def test_call_schedule_service_update(
cron=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_CRON,
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
max_run_count=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
max_concurrent_active_run_count=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT,
create_pipeline_job_request=_TEST_CREATE_PIPELINE_JOB_REQUEST,
)
assert (
Expand Down Expand Up @@ -2268,3 +2276,56 @@ def test_get_allow_queueing_before_create(
)

pipeline_job_schedule.allow_queueing

@pytest.mark.parametrize(
"job_spec",
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
)
def test_get_max_concurrent_active_run_count_before_create(
self,
mock_schedule_service_create,
mock_schedule_service_get,
mock_schedule_bucket_exists,
job_spec,
mock_load_yaml_and_json,
):
"""Gets the PipelineJobSchedule max_concurrent_active_run_count before creating.

Raises error because PipelineJobSchedule should be created first.
"""
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_GCS_BUCKET_NAME,
location=_TEST_LOCATION,
credentials=_TEST_CREDENTIALS,
)

job = pipeline_jobs.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
template_path=_TEST_TEMPLATE_PATH,
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,
input_artifacts=_TEST_PIPELINE_INPUT_ARTIFACTS,
enable_caching=True,
)

pipeline_job_schedule = pipeline_job_schedules.PipelineJobSchedule(
pipeline_job=job,
display_name=_TEST_PIPELINE_JOB_SCHEDULE_DISPLAY_NAME,
)

with pytest.raises(RuntimeError) as e:
pipeline_job_schedule.max_concurrent_active_run_count

assert e.match(regexp=r"PipelineJobSchedule resource has not been created.")

pipeline_job_schedule.create(
cron=_TEST_PIPELINE_JOB_SCHEDULE_CRON,
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
max_concurrent_active_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
create_request_timeout=None,
)

pipeline_job_schedule.max_concurrent_active_run_count
Loading