diff --git a/datastore/samples/snippets/requirements-test.txt b/datastore/samples/snippets/requirements-test.txt new file mode 100644 index 00000000000..2a21e952015 --- /dev/null +++ b/datastore/samples/snippets/requirements-test.txt @@ -0,0 +1,7 @@ +backoff===1.11.1; python_version < "3.7" +backoff==2.2.1; python_version >= "3.7" +pytest===7.4.3; python_version == '3.7' +pytest===8.3.5; python_version == '3.8' +pytest===8.4.2; python_version == '3.9' +pytest==9.0.2; python_version >= '3.10' +flaky==3.8.1 diff --git a/datastore/samples/snippets/requirements.txt b/datastore/samples/snippets/requirements.txt new file mode 100644 index 00000000000..7852f23b24e --- /dev/null +++ b/datastore/samples/snippets/requirements.txt @@ -0,0 +1 @@ +google-cloud-datastore==2.23.0 \ No newline at end of file diff --git a/datastore/samples/snippets/schedule-export/README.md b/datastore/samples/snippets/schedule-export/README.md new file mode 100644 index 00000000000..a8501cddc34 --- /dev/null +++ b/datastore/samples/snippets/schedule-export/README.md @@ -0,0 +1,5 @@ +# Scheduling Datastore exports with Cloud Functions and Cloud Scheduler + +This sample application demonstrates how to schedule exports of your Datastore entities. To deploy this sample, see: + +[Scheduling exports](https://cloud.google.com/datastore/docs/schedule-export) diff --git a/datastore/samples/snippets/schedule-export/main.py b/datastore/samples/snippets/schedule-export/main.py new file mode 100644 index 00000000000..f91b1466913 --- /dev/null +++ b/datastore/samples/snippets/schedule-export/main.py @@ -0,0 +1,57 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import json +import os + +from google.cloud import datastore_admin_v1 + +project_id = os.environ.get("GCP_PROJECT") +client = datastore_admin_v1.DatastoreAdminClient() + + +def datastore_export(event, context): + """Triggers a Datastore export from a Cloud Scheduler job. + + Args: + event (dict): event[data] must contain a json object encoded in + base-64. Cloud Scheduler encodes payloads in base-64 by default. + Object must include a 'bucket' value and can include 'kinds' + and 'namespaceIds' values. + context (google.cloud.functions.Context): The Cloud Functions event + metadata. + """ + if "data" in event: + # Triggered via Cloud Scheduler, decode the inner data field of the json payload. + json_data = json.loads(base64.b64decode(event["data"]).decode("utf-8")) + else: + # Otherwise, for instance if triggered via the Cloud Console on a Cloud Function, the event is the data. + json_data = event + + bucket = json_data["bucket"] + entity_filter = datastore_admin_v1.EntityFilter() + + if "kinds" in json_data: + entity_filter.kinds = json_data["kinds"] + + if "namespaceIds" in json_data: + entity_filter.namespace_ids = json_data["namespaceIds"] + + export_request = datastore_admin_v1.ExportEntitiesRequest( + project_id=project_id, output_url_prefix=bucket, entity_filter=entity_filter + ) + operation = client.export_entities(request=export_request) + response = operation.result() + print(response) diff --git a/datastore/samples/snippets/schedule-export/requirements-test.txt b/datastore/samples/snippets/schedule-export/requirements-test.txt new file mode 100644 index 00000000000..cb982446b31 --- /dev/null +++ b/datastore/samples/snippets/schedule-export/requirements-test.txt @@ -0,0 +1,2 @@ +pytest===8.4.2; python_version == '3.9' +pytest==9.0.2; python_version >= '3.10' diff --git a/datastore/samples/snippets/schedule-export/requirements.txt b/datastore/samples/snippets/schedule-export/requirements.txt new file mode 100644 index 00000000000..fa16c1e95ab --- /dev/null +++ b/datastore/samples/snippets/schedule-export/requirements.txt @@ -0,0 +1 @@ +google-cloud-datastore==2.23.0 diff --git a/datastore/samples/snippets/schedule-export/schedule_export_test.py b/datastore/samples/snippets/schedule-export/schedule_export_test.py new file mode 100644 index 00000000000..48d9147c923 --- /dev/null +++ b/datastore/samples/snippets/schedule-export/schedule_export_test.py @@ -0,0 +1,73 @@ +# Copyright 2019 Google LLC All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +from unittest.mock import Mock + +import main + +mock_context = Mock() +mock_context.event_id = "617187464135194" +mock_context.timestamp = "2020-04-15T22:09:03.761Z" + + +def test_datastore_export(capsys): + # Test an export without an entity filter + bucket = "gs://my-bucket" + json_string = '{{ "bucket": "{bucket}" }}'.format(bucket=bucket) + + # Encode data like Cloud Scheduler + data = bytes(json_string, "utf-8") + data_encoded = base64.b64encode(data) + event = {"data": data_encoded} + + # Mock the Datastore service + mockDatastore = Mock() + main.client = mockDatastore + + # Call tested function + main.datastore_export(event, mock_context) + out, err = capsys.readouterr() + export_args = mockDatastore.export_entities.call_args[1] + # Assert request includes test values + assert export_args["request"].output_url_prefix == bucket + + +def test_datastore_export_entity_filter(capsys): + # Test an export with an entity filter + bucket = "gs://my-bucket" + kinds = "Users,Tasks" + namespaceIds = "Customer831,Customer157" + json_string = '{{ "bucket": "{bucket}", "kinds": "{kinds}", "namespaceIds": "{namespaceIds}" }}'.format( + bucket=bucket, kinds=kinds, namespaceIds=namespaceIds + ) + + # Encode data like Cloud Scheduler + data = bytes(json_string, "utf-8") + data_encoded = base64.b64encode(data) + event = {"data": data_encoded} + + # Mock the Datastore service + mockDatastore = Mock() + main.client = mockDatastore + + # Call tested function + main.datastore_export(event, mock_context) + out, err = capsys.readouterr() + export_args = mockDatastore.export_entities.call_args[1] + # Assert request includes test values + + assert export_args["request"].output_url_prefix == bucket + assert export_args["request"].entity_filter.kinds == kinds + assert export_args["request"].entity_filter.namespace_ids == namespaceIds diff --git a/datastore/samples/snippets/snippets.py b/datastore/samples/snippets/snippets.py new file mode 100644 index 00000000000..1b86ba8b0cd --- /dev/null +++ b/datastore/samples/snippets/snippets.py @@ -0,0 +1,513 @@ +# Copyright 2022 Google, Inc. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +from datetime import datetime, timedelta, timezone +from pprint import pprint +import time + +from google.cloud import datastore # noqa: I100 + + +def _preamble(): + # [START datastore_size_coloration_query] + from google.cloud import datastore + + # For help authenticating your client, visit + # https://cloud.google.com/docs/authentication/getting-started + client = datastore.Client() + + # [END datastore_size_coloration_query] + assert client is not None + + +def in_query(client): + # [START datastore_in_query] + query = client.query(kind="Task") + query.add_filter("tag", "IN", ["learn", "study"]) + # [END datastore_in_query] + + return list(query.fetch()) + + +def not_equals_query(client): + # [START datastore_not_equals_query] + query = client.query(kind="Task") + query.add_filter("category", "!=", "work") + # [END datastore_not_equals_query] + + return list(query.fetch()) + + +def not_in_query(client): + # [START datastore_not_in_query] + query = client.query(kind="Task") + query.add_filter("category", "NOT_IN", ["work", "chores", "school"]) + # [END datastore_not_in_query] + + return list(query.fetch()) + + +def query_with_readtime(client): + # [START datastore_stale_read] + # Create a read time of 15 seconds in the past + read_time = datetime.now(timezone.utc) - timedelta(seconds=15) + + # Fetch an entity with read_time + task_key = client.key("Task", "sampletask") + entity = client.get(task_key, read_time=read_time) + + # Query Task entities with read_time + query = client.query(kind="Task") + tasks = query.fetch(read_time=read_time, limit=10) + # [END datastore_stale_read] + + results = list(tasks) + results.append(entity) + + return results + + +def count_query_in_transaction(client): + # [START datastore_count_in_transaction] + task1 = datastore.Entity(client.key("Task", "task1")) + task2 = datastore.Entity(client.key("Task", "task2")) + + task1["owner"] = "john" + task2["owner"] = "john" + + tasks = [task1, task2] + client.put_multi(tasks) + + with client.transaction() as transaction: + + tasks_of_john = client.query(kind="Task") + tasks_of_john.add_filter("owner", "=", "john") + total_tasks_query = client.aggregation_query(tasks_of_john) + + query_result = total_tasks_query.count(alias="tasks_count").fetch() + for task_result in query_result: + tasks_count = task_result[0] + if tasks_count.value < 2: + task3 = datastore.Entity(client.key("Task", "task3")) + task3["owner"] = "john" + transaction.put(task3) + tasks.append(task3) + else: + print(f"Found existing {tasks_count.value} tasks, rolling back") + client.entities_to_delete.extend(tasks) + raise ValueError("User 'John' cannot have more than 2 tasks") + # [END datastore_count_in_transaction] + + +def count_query_on_kind(client): + # [START datastore_count_on_kind] + task1 = datastore.Entity(client.key("Task", "task1")) + task2 = datastore.Entity(client.key("Task", "task2")) + + tasks = [task1, task2] + client.put_multi(tasks) + all_tasks_query = client.query(kind="Task") + all_tasks_count_query = client.aggregation_query(all_tasks_query).count() + query_result = all_tasks_count_query.fetch() + for aggregation_results in query_result: + for aggregation in aggregation_results: + print(f"Total tasks (accessible from default alias) is {aggregation.value}") + # [END datastore_count_on_kind] + return tasks + + +def count_query_with_limit(client): + # [START datastore_count_with_limit] + task1 = datastore.Entity(client.key("Task", "task1")) + task2 = datastore.Entity(client.key("Task", "task2")) + task3 = datastore.Entity(client.key("Task", "task3")) + + tasks = [task1, task2, task3] + client.put_multi(tasks) + all_tasks_query = client.query(kind="Task") + all_tasks_count_query = client.aggregation_query(all_tasks_query).count() + query_result = all_tasks_count_query.fetch(limit=2) + for aggregation_results in query_result: + for aggregation in aggregation_results: + print(f"We have at least {aggregation.value} tasks") + # [END datastore_count_with_limit] + return tasks + + +def count_query_property_filter(client): + # [START datastore_count_with_property_filter] + task1 = datastore.Entity(client.key("Task", "task1")) + task2 = datastore.Entity(client.key("Task", "task2")) + task3 = datastore.Entity(client.key("Task", "task3")) + + task1["done"] = True + task2["done"] = False + task3["done"] = True + + tasks = [task1, task2, task3] + client.put_multi(tasks) + completed_tasks = client.query(kind="Task").add_filter("done", "=", True) + remaining_tasks = client.query(kind="Task").add_filter("done", "=", False) + + completed_tasks_query = client.aggregation_query(query=completed_tasks).count( + alias="total_completed_count" + ) + remaining_tasks_query = client.aggregation_query(query=remaining_tasks).count( + alias="total_remaining_count" + ) + + completed_query_result = completed_tasks_query.fetch() + for aggregation_results in completed_query_result: + for aggregation_result in aggregation_results: + if aggregation_result.alias == "total_completed_count": + print(f"Total completed tasks count is {aggregation_result.value}") + + remaining_query_result = remaining_tasks_query.fetch() + for aggregation_results in remaining_query_result: + for aggregation_result in aggregation_results: + if aggregation_result.alias == "total_remaining_count": + print(f"Total remaining tasks count is {aggregation_result.value}") + # [END datastore_count_with_property_filter] + return tasks + + +def count_query_with_stale_read(client): + + tasks = [task for task in client.query(kind="Task").fetch()] + client.delete_multi(tasks) # ensure the database is empty before starting + + # [START datastore_count_query_with_stale_read] + task1 = datastore.Entity(client.key("Task", "task1")) + task2 = datastore.Entity(client.key("Task", "task2")) + + # Saving two tasks + task1["done"] = True + task2["done"] = False + client.put_multi([task1, task2]) + time.sleep(10) + + past_timestamp = datetime.now( + timezone.utc + ) # we have two tasks in database at this time. + time.sleep(10) + + # Saving third task + task3 = datastore.Entity(client.key("Task", "task3")) + task3["done"] = False + client.put(task3) + + all_tasks = client.query(kind="Task") + all_tasks_count = client.aggregation_query( + query=all_tasks, + ).count(alias="all_tasks_count") + + # Executing aggregation query + query_result = all_tasks_count.fetch() + for aggregation_results in query_result: + for aggregation_result in aggregation_results: + print(f"Latest tasks count is {aggregation_result.value}") + + # Executing aggregation query with past timestamp + tasks_in_past = client.aggregation_query(query=all_tasks).count( + alias="tasks_in_past" + ) + tasks_in_the_past_query_result = tasks_in_past.fetch(read_time=past_timestamp) + for aggregation_results in tasks_in_the_past_query_result: + for aggregation_result in aggregation_results: + print(f"Stale tasks count is {aggregation_result.value}") + # [END datastore_count_query_with_stale_read] + return [task1, task2, task3] + + +def sum_query_on_kind(client): + # [START datastore_sum_aggregation_query_on_kind] + # Set up sample entities + # Use incomplete key to auto-generate ID + task1 = datastore.Entity(client.key("Task")) + task2 = datastore.Entity(client.key("Task")) + task3 = datastore.Entity(client.key("Task")) + + task1["hours"] = 5 + task2["hours"] = 3 + task3["hours"] = 1 + + tasks = [task1, task2, task3] + client.put_multi(tasks) + + # Execute sum aggregation query + all_tasks_query = client.query(kind="Task") + all_tasks_sum_query = client.aggregation_query(all_tasks_query).sum("hours") + query_result = all_tasks_sum_query.fetch() + for aggregation_results in query_result: + for aggregation in aggregation_results: + print(f"Total sum of hours in tasks is {aggregation.value}") + # [END datastore_sum_aggregation_query_on_kind] + return tasks + + +def sum_query_property_filter(client): + # [START datastore_sum_aggregation_query_with_filters] + # Set up sample entities + # Use incomplete key to auto-generate ID + task1 = datastore.Entity(client.key("Task")) + task2 = datastore.Entity(client.key("Task")) + task3 = datastore.Entity(client.key("Task")) + + task1["hours"] = 5 + task2["hours"] = 3 + task3["hours"] = 1 + + task1["done"] = True + task2["done"] = True + task3["done"] = False + + tasks = [task1, task2, task3] + client.put_multi(tasks) + + # Execute sum aggregation query with filters + completed_tasks = client.query(kind="Task").add_filter("done", "=", True) + completed_tasks_query = client.aggregation_query(query=completed_tasks).sum( + property_ref="hours", alias="total_completed_sum_hours" + ) + + completed_query_result = completed_tasks_query.fetch() + for aggregation_results in completed_query_result: + for aggregation_result in aggregation_results: + if aggregation_result.alias == "total_completed_sum_hours": + print( + f"Total sum of hours in completed tasks is {aggregation_result.value}" + ) + # [END datastore_sum_aggregation_query_with_filters] + return tasks + + +def avg_query_on_kind(client): + # [START datastore_avg_aggregation_query_on_kind] + # Set up sample entities + # Use incomplete key to auto-generate ID + task1 = datastore.Entity(client.key("Task")) + task2 = datastore.Entity(client.key("Task")) + task3 = datastore.Entity(client.key("Task")) + + task1["hours"] = 5 + task2["hours"] = 3 + task3["hours"] = 1 + + tasks = [task1, task2, task3] + client.put_multi(tasks) + + # Execute average aggregation query + all_tasks_query = client.query(kind="Task") + all_tasks_avg_query = client.aggregation_query(all_tasks_query).avg("hours") + query_result = all_tasks_avg_query.fetch() + for aggregation_results in query_result: + for aggregation in aggregation_results: + print(f"Total average of hours in tasks is {aggregation.value}") + # [END datastore_avg_aggregation_query_on_kind] + return tasks + + +def avg_query_property_filter(client): + # [START datastore_avg_aggregation_query_with_filters] + # Set up sample entities + # Use incomplete key to auto-generate ID + task1 = datastore.Entity(client.key("Task")) + task2 = datastore.Entity(client.key("Task")) + task3 = datastore.Entity(client.key("Task")) + + task1["hours"] = 5 + task2["hours"] = 3 + task3["hours"] = 1 + + task1["done"] = True + task2["done"] = True + task3["done"] = False + + tasks = [task1, task2, task3] + client.put_multi(tasks) + + # Execute average aggregation query with filters + completed_tasks = client.query(kind="Task").add_filter("done", "=", True) + completed_tasks_query = client.aggregation_query(query=completed_tasks).avg( + property_ref="hours", alias="total_completed_avg_hours" + ) + + completed_query_result = completed_tasks_query.fetch() + for aggregation_results in completed_query_result: + for aggregation_result in aggregation_results: + if aggregation_result.alias == "total_completed_avg_hours": + print( + f"Total average of hours in completed tasks is {aggregation_result.value}" + ) + # [END datastore_avg_aggregation_query_with_filters] + return tasks + + +def multiple_aggregations_query(client): + # [START datastore_multiple_aggregation_in_structured_query] + # Set up sample entities + # Use incomplete key to auto-generate ID + task1 = datastore.Entity(client.key("Task")) + task2 = datastore.Entity(client.key("Task")) + task3 = datastore.Entity(client.key("Task")) + + task1["hours"] = 5 + task2["hours"] = 3 + task3["hours"] = 1 + + tasks = [task1, task2, task3] + client.put_multi(tasks) + + # Execute query with multiple aggregations + all_tasks_query = client.query(kind="Task") + aggregation_query = client.aggregation_query(all_tasks_query) + # Add aggregations + aggregation_query.add_aggregations( + [ + datastore.aggregation.CountAggregation(alias="count_aggregation"), + datastore.aggregation.SumAggregation( + property_ref="hours", alias="sum_aggregation" + ), + datastore.aggregation.AvgAggregation( + property_ref="hours", alias="avg_aggregation" + ), + ] + ) + + query_result = aggregation_query.fetch() + for aggregation_results in query_result: + for aggregation in aggregation_results: + print(f"{aggregation.alias} value is {aggregation.value}") + # [END datastore_multiple_aggregation_in_structured_query] + return tasks + + +def explain_analyze_entity(client): + # [START datastore_query_explain_analyze_entity] + # Build the query with explain_options + # analzye = true to get back the query stats, plan info, and query results + query = client.query( + kind="Task", explain_options=datastore.ExplainOptions(analyze=True) + ) + + # initiate the query + iterator = query.fetch() + + # explain_metrics is only available after query is completed + for task_result in iterator: + print(task_result) + + # get the plan summary + plan_summary = iterator.explain_metrics.plan_summary + print(f"Indexes used: {plan_summary.indexes_used}") + + # get the execution stats + execution_stats = iterator.explain_metrics.execution_stats + print(f"Results returned: {execution_stats.results_returned}") + print(f"Execution duration: {execution_stats.execution_duration}") + print(f"Read operations: {execution_stats.read_operations}") + print(f"Debug stats: {execution_stats.debug_stats}") + # [END datastore_query_explain_analyze_entity] + + +def explain_entity(client): + # [START datastore_query_explain_entity] + # Build the query with explain_options + # by default (analyze = false), only plan_summary property is available + query = client.query(kind="Task", explain_options=datastore.ExplainOptions()) + + # initiate the query + iterator = query.fetch() + + # get the plan summary + plan_summary = iterator.explain_metrics.plan_summary + print(f"Indexes used: {plan_summary.indexes_used}") + # [END datastore_query_explain_entity] + + +def explain_analyze_aggregation(client): + # [START datastore_query_explain_analyze_aggregation] + # Build the aggregation query with explain_options + # analzye = true to get back the query stats, plan info, and query results + all_tasks_query = client.query(kind="Task") + count_query = client.aggregation_query( + all_tasks_query, explain_options=datastore.ExplainOptions(analyze=True) + ).count() + + # initiate the query + iterator = count_query.fetch() + + # explain_metrics is only available after query is completed + for task_result in iterator: + print(task_result) + + # get the plan summary + plan_summary = iterator.explain_metrics.plan_summary + print(f"Indexes used: {plan_summary.indexes_used}") + + # get the execution stats + execution_stats = iterator.explain_metrics.execution_stats + print(f"Results returned: {execution_stats.results_returned}") + print(f"Execution duration: {execution_stats.execution_duration}") + print(f"Read operations: {execution_stats.read_operations}") + print(f"Debug stats: {execution_stats.debug_stats}") + # [END datastore_query_explain_analyze_aggregation] + + +def explain_aggregation(client): + # [START datastore_query_explain_aggregation] + # Build the aggregation query with explain_options + # by default (analyze = false), only plan_summary property is available + all_tasks_query = client.query(kind="Task") + count_query = client.aggregation_query( + all_tasks_query, explain_options=datastore.ExplainOptions() + ).count() + + # initiate the query + iterator = count_query.fetch() + + # get the plan summary + plan_summary = iterator.explain_metrics.plan_summary + print(f"Indexes used: {plan_summary.indexes_used}") + # [END datastore_query_explain_aggregation] + + +def main(project_id): + client = datastore.Client(project_id) + + for name, function in globals().items(): + if name in ( + "main", + "_preamble", + "defaultdict", + "datetime", + "timezone", + "timedelta", + ) or not callable(function): + continue + + print(name) + pprint(function(client)) + print("\n-----------------\n") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Demonstrates datastore API operations." + ) + parser.add_argument("project_id", help="Your cloud project ID.") + + args = parser.parse_args() + + main(args.project_id) diff --git a/datastore/samples/snippets/snippets_test.py b/datastore/samples/snippets/snippets_test.py new file mode 100644 index 00000000000..ae3b2948b34 --- /dev/null +++ b/datastore/samples/snippets/snippets_test.py @@ -0,0 +1,249 @@ +# Copyright 2022 Google, Inc. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import backoff +import google.api_core.exceptions +from google.cloud import datastore +from google.cloud import datastore_admin_v1 +import pytest + +import snippets + +PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] + + +class CleanupClient(datastore.Client): + def __init__(self, *args, **kwargs): + super(CleanupClient, self).__init__(*args, **kwargs) + self.entities_to_delete = [] + self.keys_to_delete = [] + + def cleanup(self): + with self.batch(): + self.delete_multi( + list(set([x.key for x in self.entities_to_delete if x])) + + list(set(self.keys_to_delete)) + ) + + +@pytest.fixture +def client(): + client = CleanupClient(PROJECT) + yield client + client.cleanup() + + +@pytest.fixture(scope="session", autouse=True) +def setup_indexes(request): + # Set up required indexes + admin_client = datastore_admin_v1.DatastoreAdminClient() + + indexes = [] + done_property_index = datastore_admin_v1.Index.IndexedProperty( + name="done", direction=datastore_admin_v1.Index.Direction.ASCENDING + ) + hour_property_index = datastore_admin_v1.Index.IndexedProperty( + name="hours", direction=datastore_admin_v1.Index.Direction.ASCENDING + ) + done_hour_index = datastore_admin_v1.Index( + kind="Task", + ancestor=datastore_admin_v1.Index.AncestorMode.NONE, + properties=[done_property_index, hour_property_index], + ) + indexes.append(done_hour_index) + + for index in indexes: + request = datastore_admin_v1.CreateIndexRequest(project_id=PROJECT, index=index) + # Create the required index + # Dependant tests will fail until the index is ready + try: + admin_client.create_index(request) + # Pass if the index already exists + except (google.api_core.exceptions.AlreadyExists): + pass + + +@pytest.mark.flaky +class TestDatastoreSnippets: + # These tests mostly just test the absence of exceptions. + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_in_query(self, client): + tasks = snippets.in_query(client) + client.entities_to_delete.extend(tasks) + assert tasks is not None + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_not_equals_query(self, client): + tasks = snippets.not_equals_query(client) + client.entities_to_delete.extend(tasks) + assert tasks is not None + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_not_in_query(self, client): + tasks = snippets.not_in_query(client) + client.entities_to_delete.extend(tasks) + assert tasks is not None + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_query_with_readtime(self, client): + tasks = snippets.query_with_readtime(client) + client.entities_to_delete.extend(tasks) + assert tasks is not None + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_count_query_in_transaction(self, client): + with pytest.raises(ValueError) as excinfo: + snippets.count_query_in_transaction(client) + assert "User 'John' cannot have more than 2 tasks" in str(excinfo.value) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_count_query_on_kind(self, capsys, client): + tasks = snippets.count_query_on_kind(client) + captured = capsys.readouterr() + assert ( + captured.out.strip() == "Total tasks (accessible from default alias) is 2" + ) + assert captured.err == "" + + client.entities_to_delete.extend(tasks) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_count_query_with_limit(self, capsys, client): + tasks = snippets.count_query_with_limit(client) + captured = capsys.readouterr() + assert captured.out.strip() == "We have at least 2 tasks" + assert captured.err == "" + + client.entities_to_delete.extend(tasks) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_count_query_property_filter(self, capsys, client): + tasks = snippets.count_query_property_filter(client) + captured = capsys.readouterr() + + assert "Total completed tasks count is 2" in captured.out + assert "Total remaining tasks count is 1" in captured.out + assert captured.err == "" + + client.entities_to_delete.extend(tasks) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_count_query_with_stale_read(self, capsys, client): + tasks = snippets.count_query_with_stale_read(client) + captured = capsys.readouterr() + + assert "Latest tasks count is 3" in captured.out + assert "Stale tasks count is 2" in captured.out + assert captured.err == "" + + client.entities_to_delete.extend(tasks) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_sum_query_on_kind(self, capsys, client): + tasks = snippets.sum_query_on_kind(client) + captured = capsys.readouterr() + assert captured.out.strip() == "Total sum of hours in tasks is 9" + assert captured.err == "" + + client.entities_to_delete.extend(tasks) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_sum_query_property_filter(self, capsys, client): + tasks = snippets.sum_query_property_filter(client) + captured = capsys.readouterr() + assert captured.out.strip() == "Total sum of hours in completed tasks is 8" + assert captured.err == "" + + client.entities_to_delete.extend(tasks) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_avg_query_on_kind(self, capsys, client): + tasks = snippets.avg_query_on_kind(client) + captured = capsys.readouterr() + assert captured.out.strip() == "Total average of hours in tasks is 3.0" + assert captured.err == "" + + client.entities_to_delete.extend(tasks) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_avg_query_property_filter(self, capsys, client): + tasks = snippets.avg_query_property_filter(client) + captured = capsys.readouterr() + assert ( + captured.out.strip() == "Total average of hours in completed tasks is 4.0" + ) + assert captured.err == "" + + client.entities_to_delete.extend(tasks) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_multiple_aggregations_query(self, capsys, client): + tasks = snippets.multiple_aggregations_query(client) + captured = capsys.readouterr() + assert "avg_aggregation value is 3.0" in captured.out + assert "count_aggregation value is 3" in captured.out + assert "sum_aggregation value is 9" in captured.out + assert captured.err == "" + + client.entities_to_delete.extend(tasks) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_explain_analyze_entity(self, capsys, client): + snippets.explain_analyze_entity(client) + captured = capsys.readouterr() + assert ( + "Indexes used: [{'properties': '(__name__ ASC)', 'query_scope': 'Collection group'}]" + in captured.out + ) + assert "Results returned: 0" in captured.out + assert "Execution duration: 0:00" in captured.out + assert "Read operations: 0" in captured.out + assert "Debug stats: {" in captured.out + assert captured.err == "" + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_explain_entity(self, capsys, client): + snippets.explain_entity(client) + captured = capsys.readouterr() + assert ( + "Indexes used: [{'properties': '(__name__ ASC)', 'query_scope': 'Collection group'}]" + in captured.out + ) + assert captured.err == "" + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_explain_analyze_aggregation(self, capsys, client): + snippets.explain_analyze_aggregation(client) + captured = capsys.readouterr() + assert ( + "Indexes used: [{'properties': '(__name__ ASC)', 'query_scope': 'Collection group'}]" + in captured.out + ) + assert "Results returned: 1" in captured.out + assert "Execution duration: 0:00" in captured.out + assert "Read operations: 1" in captured.out + assert "Debug stats: {" in captured.out + assert captured.err == "" + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_explain_aggregation(self, capsys, client): + snippets.explain_aggregation(client) + captured = capsys.readouterr() + assert ( + "Indexes used: [{'properties': '(__name__ ASC)', 'query_scope': 'Collection group'}]" + in captured.out + ) + assert captured.err == ""