From 8d131dceab3350211089627c026b06ac5ea920e3 Mon Sep 17 00:00:00 2001 From: Lukasz Wyszomirski Date: Fri, 5 Aug 2022 12:11:09 +0000 Subject: [PATCH] Dataform operators --- .../providers/google/cloud/hooks/dataform.py | 252 +++++++++++ .../providers/google/cloud/links/dataform.py | 59 +++ .../google/cloud/operators/dataform.py | 411 ++++++++++++++++++ .../google/cloud/sensors/dataform.py | 108 +++++ airflow/providers/google/provider.yaml | 16 + .../operators/cloud/dataform.rst | 110 +++++ docs/spelling_wordlist.txt | 1 + generated/provider_dependencies.json | 1 + .../google/cloud/hooks/test_dataform.py | 157 +++++++ .../google/cloud/operators/test_dataform.py | 176 ++++++++ .../google/cloud/dataform/__init__.py | 16 + .../google/cloud/dataform/example_dataform.py | 173 ++++++++ 12 files changed, 1480 insertions(+) create mode 100644 airflow/providers/google/cloud/hooks/dataform.py create mode 100644 airflow/providers/google/cloud/links/dataform.py create mode 100644 airflow/providers/google/cloud/operators/dataform.py create mode 100644 airflow/providers/google/cloud/sensors/dataform.py create mode 100644 docs/apache-airflow-providers-google/operators/cloud/dataform.rst create mode 100644 tests/providers/google/cloud/hooks/test_dataform.py create mode 100644 tests/providers/google/cloud/operators/test_dataform.py create mode 100644 tests/system/providers/google/cloud/dataform/__init__.py create mode 100644 tests/system/providers/google/cloud/dataform/example_dataform.py diff --git a/airflow/providers/google/cloud/hooks/dataform.py b/airflow/providers/google/cloud/hooks/dataform.py new file mode 100644 index 0000000000000..7e666486c55ce --- /dev/null +++ b/airflow/providers/google/cloud/hooks/dataform.py @@ -0,0 +1,252 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import time +from typing import Dict, Optional, Sequence, Tuple, Union + +from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault +from google.api_core.retry import Retry +from google.cloud.dataform_v1beta1 import DataformClient +from google.cloud.dataform_v1beta1.types import CompilationResult, WorkflowInvocation + +from airflow import AirflowException +from airflow.providers.google.common.hooks.base_google import GoogleBaseHook + + +class DataformHook(GoogleBaseHook): + """Hook for Google Cloud DataForm APIs.""" + + def get_dataform_client( + self, + ) -> DataformClient: + """Retrieves client library object that allow access to Cloud Dataform service.""" + return DataformClient(credentials=self._get_credentials()) + + @GoogleBaseHook.fallback_to_default_project_id + def wait_for_workflow_invocation( + self, + workflow_invocation_id: str, + repository_id: str, + project_id: str, + region: str, + wait_time: int = 10, + timeout: Optional[int] = None, + ) -> None: + """ + Helper method which polls a job to check if it finishes. + + :param workflow_invocation_id: Id of the Workflow Invocation + :param repository_id: Id of the Dataform repository + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :param region: Required. The Cloud Dataproc region in which to handle the request. + :param wait_time: Number of seconds between checks + :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False + """ + if region is None: + raise TypeError("missing 1 required keyword argument: 'region'") + state = None + start = time.monotonic() + while state not in ( + WorkflowInvocation.State.FAILED, + WorkflowInvocation.State.SUCCEEDED, + WorkflowInvocation.State.CANCELLED, + ): + if timeout and start + timeout < time.monotonic(): + raise AirflowException( + f"Timeout: workflow invocation {workflow_invocation_id} is not ready after {timeout}s" + ) + time.sleep(wait_time) + try: + workflow_invocation = self.get_workflow_invocation( + project_id=project_id, + region=region, + repository_id=repository_id, + workflow_invocation_id=workflow_invocation_id, + ) + state = workflow_invocation + except Exception as err: + self.log.info( + "Retrying. Dataform API returned error when waiting for workflow invocation: %s", err + ) + + if state == WorkflowInvocation.State.FAILED: + raise AirflowException(f'Workflow Invocation failed:\n{workflow_invocation}') + if state == WorkflowInvocation.State.CANCELLED: + raise AirflowException(f'Workflow Invocation was cancelled:\n{workflow_invocation}') + + @GoogleBaseHook.fallback_to_default_project_id + def create_compilation_result( + self, + project_id: str, + region: str, + repository_id: str, + compilation_result: Union[CompilationResult, Dict], + retry: Union[Retry, _MethodDefault] = DEFAULT, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + ) -> CompilationResult: + """ + Creates a new CompilationResult in a given project and location. + + :param project_id: Required. The ID of the Google Cloud project that the task belongs to. + :param region: Required. The ID of the Google Cloud region that the task belongs to. + :param repository_id: Required. The ID of the Dataform repository that the task belongs to. + :param compilation_result: Required. The compilation result to create. + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + """ + client = self.get_dataform_client() + parent = f"projects/{project_id}/locations/{region}/repositories/{repository_id}" + return client.create_compilation_result( + request={ + "parent": parent, + "compilation_result": compilation_result, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def get_compilation_result( + self, + project_id: str, + region: str, + repository_id: str, + compilation_result_id: str, + retry: Union[Retry, _MethodDefault] = DEFAULT, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + ) -> CompilationResult: + """ + Fetches a single CompilationResult. + + :param project_id: Required. The ID of the Google Cloud project that the task belongs to. + :param region: Required. The ID of the Google Cloud region that the task belongs to. + :param repository_id: Required. The ID of the Dataform repository that the task belongs to. + :param compilation_result_id: The Id of the Dataform Compilation Result + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + """ + client = self.get_dataform_client() + name = ( + f"projects/{project_id}/locations/{region}/repositories/" + f"{repository_id}/compilationResults/{compilation_result_id}" + ) + return client.get_compilation_result( + request={"name": name}, retry=retry, timeout=timeout, metadata=metadata + ) + + @GoogleBaseHook.fallback_to_default_project_id + def create_workflow_invocation( + self, + project_id: str, + region: str, + repository_id: str, + workflow_invocation: Union[WorkflowInvocation, Dict], + retry: Union[Retry, _MethodDefault] = DEFAULT, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + ) -> WorkflowInvocation: + """ + Creates a new WorkflowInvocation in a given Repository. + + :param project_id: Required. The ID of the Google Cloud project that the task belongs to. + :param region: Required. The ID of the Google Cloud region that the task belongs to. + :param repository_id: Required. The ID of the Dataform repository that the task belongs to. + :param workflow_invocation: Required. The workflow invocation resource to create. + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + """ + client = self.get_dataform_client() + parent = f"projects/{project_id}/locations/{region}/repositories/{repository_id}" + return client.create_workflow_invocation( + request={"parent": parent, "workflow_invocation": workflow_invocation}, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def get_workflow_invocation( + self, + project_id: str, + region: str, + repository_id: str, + workflow_invocation_id: str, + retry: Union[Retry, _MethodDefault] = DEFAULT, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + ) -> WorkflowInvocation: + """ + Fetches a single WorkflowInvocation. + + :param project_id: Required. The ID of the Google Cloud project that the task belongs to. + :param region: Required. The ID of the Google Cloud region that the task belongs to. + :param repository_id: Required. The ID of the Dataform repository that the task belongs to. + :param workflow_invocation_id: Required. The workflow invocation resource's id. + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + """ + client = self.get_dataform_client() + name = ( + f"projects/{project_id}/locations/{region}/repositories/" + f"{repository_id}/workflowInvocations/{workflow_invocation_id}" + ) + return client.get_workflow_invocation( + request={ + "name": name, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def cancel_workflow_invocation( + self, + project_id: str, + region: str, + repository_id: str, + workflow_invocation_id: str, + retry: Union[Retry, _MethodDefault] = DEFAULT, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + ): + """ + Requests cancellation of a running WorkflowInvocation. + + :param project_id: Required. The ID of the Google Cloud project that the task belongs to. + :param region: Required. The ID of the Google Cloud region that the task belongs to. + :param repository_id: Required. The ID of the Dataform repository that the task belongs to. + :param workflow_invocation_id: Required. The workflow invocation resource's id. + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + """ + client = self.get_dataform_client() + name = ( + f"projects/{project_id}/locations/{region}/repositories/" + f"{repository_id}/workflowInvocations/{workflow_invocation_id}" + ) + client.cancel_workflow_invocation( + request={"name": name}, retry=retry, timeout=timeout, metadata=metadata + ) diff --git a/airflow/providers/google/cloud/links/dataform.py b/airflow/providers/google/cloud/links/dataform.py new file mode 100644 index 0000000000000..d3128481855ae --- /dev/null +++ b/airflow/providers/google/cloud/links/dataform.py @@ -0,0 +1,59 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""This module contains Google Dataflow links.""" +from typing import TYPE_CHECKING + +from airflow.models import BaseOperator +from airflow.providers.google.cloud.links.base import BaseGoogleLink + +if TYPE_CHECKING: + from airflow.utils.context import Context +DATAFORM_BASE_LINK = "https://pantheon.corp.google.com/bigquery/dataform" +DATAFORM_WORKFLOW_INVOCATION_LINK = ( + DATAFORM_BASE_LINK + + "/locations/{region}/repositories/{repository_id}/workflows/" + + "{workflow_invocation_id}?project={project_id}" +) + + +class DataformWorkflowInvocationLink(BaseGoogleLink): + """Helper class for constructing Dataflow Job Link""" + + name = "Dataform Workflow Invocation" + key = "dataform_workflow_invocation_config" + format_str = DATAFORM_WORKFLOW_INVOCATION_LINK + + @staticmethod + def persist( + operator_instance: BaseOperator, + context: "Context", + project_id: str, + region: str, + repository_id: str, + workflow_invocation_id: str, + ): + operator_instance.xcom_push( + context, + key=DataformWorkflowInvocationLink.key, + value={ + "project_id": project_id, + "region": region, + "repository_id": repository_id, + "workflow_invocation_id": workflow_invocation_id, + }, + ) diff --git a/airflow/providers/google/cloud/operators/dataform.py b/airflow/providers/google/cloud/operators/dataform.py new file mode 100644 index 0000000000000..f06e594b6d71d --- /dev/null +++ b/airflow/providers/google/cloud/operators/dataform.py @@ -0,0 +1,411 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import TYPE_CHECKING, Dict, Optional, Sequence, Tuple, Union + +from airflow.providers.google.cloud.links.dataform import DataformWorkflowInvocationLink + +if TYPE_CHECKING: + from airflow.utils.context import Context + +from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault +from google.api_core.retry import Retry +from google.cloud.dataform_v1beta1.types import CompilationResult, WorkflowInvocation + +from airflow.models import BaseOperator +from airflow.providers.google.cloud.hooks.dataform import DataformHook + + +class DataformCreateCompilationResultOperator(BaseOperator): + """ + Creates a new CompilationResult in a given project and location. + + :param project_id: Required. The ID of the Google Cloud project that the task belongs to. + :param region: Required. The ID of the Google Cloud region that the task belongs to. + :param repository_id: Required. The ID of the Dataform repository that the task belongs to. + :param compilation_result: Required. The compilation result to create. + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the + request must have domain-wide delegation enabled. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + def __init__( + self, + project_id: str, + region: str, + repository_id: str, + compilation_result: Union[CompilationResult, Dict], + retry: Union[Retry, _MethodDefault] = DEFAULT, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + delegate_to: Optional[str] = None, + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.project_id = project_id + self.region = region + self.repository_id = repository_id + self.compilation_result = compilation_result + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.impersonation_chain = impersonation_chain + + def execute(self, context: "Context"): + hook = DataformHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + impersonation_chain=self.impersonation_chain, + ) + result = hook.create_compilation_result( + project_id=self.project_id, + region=self.region, + repository_id=self.repository_id, + compilation_result=self.compilation_result, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + return CompilationResult.to_dict(result) + + +class DataformGetCompilationResultOperator(BaseOperator): + """ + Fetches a single CompilationResult. + + :param project_id: Required. The ID of the Google Cloud project that the task belongs to. + :param region: Required. The ID of the Google Cloud region that the task belongs to. + :param repository_id: Required. The ID of the Dataform repository that the task belongs to. + :param compilation_result_id: The Id of the Dataform Compilation Result + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the + request must have domain-wide delegation enabled. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields = ("repository_id", "compilation_result_id", "delegate_to", "impersonation_chain") + + def __init__( + self, + project_id: str, + region: str, + repository_id: str, + compilation_result_id: str, + retry: Union[Retry, _MethodDefault] = DEFAULT, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + delegate_to: Optional[str] = None, + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.project_id = project_id + self.region = region + self.repository_id = repository_id + self.compilation_result_id = compilation_result_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.impersonation_chain = impersonation_chain + + def execute(self, context: "Context"): + hook = DataformHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + impersonation_chain=self.impersonation_chain, + ) + result = hook.get_compilation_result( + project_id=self.project_id, + region=self.region, + repository_id=self.repository_id, + compilation_result_id=self.compilation_result_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + return CompilationResult.to_dict(result) + + +class DataformCreateWorkflowInvocationOperator(BaseOperator): + """ + Creates a new WorkflowInvocation in a given Repository. + + :param project_id: Required. The ID of the Google Cloud project that the task belongs to. + :param region: Required. The ID of the Google Cloud region that the task belongs to. + :param repository_id: Required. The ID of the Dataform repository that the task belongs to. + :param workflow_invocation: Required. The workflow invocation resource to create. + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the + request must have domain-wide delegation enabled. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + :param asynchronous: Flag to return workflow_invocation_id from the Dataform API. + This is useful for submitting long running workflows and + waiting on them asynchronously using the DataformWorkflowInvocationStateSensor + :param wait_time: Number of seconds between checks + """ + + template_fields = ("workflow_invocation", "delegate_to", "impersonation_chain") + operator_extra_links = (DataformWorkflowInvocationLink(),) + + def __init__( + self, + project_id: str, + region: str, + repository_id: str, + workflow_invocation: Union[WorkflowInvocation, Dict], + retry: Union[Retry, _MethodDefault] = DEFAULT, + timeout: Optional[int] = None, + metadata: Sequence[Tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + delegate_to: Optional[str] = None, + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + asynchronous: bool = False, + wait_time: int = 10, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.project_id = project_id + self.region = region + self.repository_id = repository_id + self.workflow_invocation = workflow_invocation + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.impersonation_chain = impersonation_chain + self.asynchronous = asynchronous + self.wait_time = wait_time + + def execute(self, context: "Context"): + hook = DataformHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + impersonation_chain=self.impersonation_chain, + ) + result = hook.create_workflow_invocation( + project_id=self.project_id, + region=self.region, + repository_id=self.repository_id, + workflow_invocation=self.workflow_invocation, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + workflow_invocation_id = result.name.split("/")[-1] + DataformWorkflowInvocationLink.persist( + operator_instance=self, + context=context, + project_id=self.project_id, + region=self.region, + repository_id=self.repository_id, + workflow_invocation_id=workflow_invocation_id, + ) + if not self.asynchronous: + hook.wait_for_workflow_invocation( + workflow_invocation_id=workflow_invocation_id, + repository_id=self.repository_id, + project_id=self.project_id, + region=self.region, + timeout=self.timeout, + wait_time=self.wait_time, + ) + return WorkflowInvocation.to_dict(result) + + +class DataformGetWorkflowInvocationOperator(BaseOperator): + """ + Fetches a single WorkflowInvocation. + + :param project_id: Required. The ID of the Google Cloud project that the task belongs to. + :param region: Required. The ID of the Google Cloud region that the task belongs to. + :param repository_id: Required. The ID of the Dataform repository that the task belongs to. + :param workflow_invocation_id: the workflow invocation resource's id. + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the + request must have domain-wide delegation enabled. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields = ("repository_id", "workflow_invocation_id", "delegate_to", "impersonation_chain") + operator_extra_links = (DataformWorkflowInvocationLink(),) + + def __init__( + self, + project_id: str, + region: str, + repository_id: str, + workflow_invocation_id: str, + retry: Union[Retry, _MethodDefault] = DEFAULT, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + delegate_to: Optional[str] = None, + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.project_id = project_id + self.region = region + self.repository_id = repository_id + self.workflow_invocation_id = workflow_invocation_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.impersonation_chain = impersonation_chain + + def execute(self, context: "Context"): + hook = DataformHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + impersonation_chain=self.impersonation_chain, + ) + result = hook.get_workflow_invocation( + project_id=self.project_id, + region=self.region, + repository_id=self.repository_id, + workflow_invocation_id=self.workflow_invocation_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + return WorkflowInvocation.to_dict(result) + + +class DataformCancelWorkflowInvocationOperator(BaseOperator): + """ + Requests cancellation of a running WorkflowInvocation. + + :param project_id: Required. The ID of the Google Cloud project that the task belongs to. + :param region: Required. The ID of the Google Cloud region that the task belongs to. + :param repository_id: Required. The ID of the Dataform repository that the task belongs to. + :param workflow_invocation_id: the workflow invocation resource's id. + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the + request must have domain-wide delegation enabled. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields = ("repository_id", "workflow_invocation_id", "delegate_to", "impersonation_chain") + operator_extra_links = (DataformWorkflowInvocationLink(),) + + def __init__( + self, + project_id: str, + region: str, + repository_id: str, + workflow_invocation_id: str, + retry: Union[Retry, _MethodDefault] = DEFAULT, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + delegate_to: Optional[str] = None, + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.project_id = project_id + self.region = region + self.repository_id = repository_id + self.workflow_invocation_id = workflow_invocation_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.impersonation_chain = impersonation_chain + + def execute(self, context: "Context"): + hook = DataformHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + impersonation_chain=self.impersonation_chain, + ) + hook.cancel_workflow_invocation( + project_id=self.project_id, + region=self.region, + repository_id=self.repository_id, + workflow_invocation_id=self.workflow_invocation_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) diff --git a/airflow/providers/google/cloud/sensors/dataform.py b/airflow/providers/google/cloud/sensors/dataform.py new file mode 100644 index 0000000000000..0df7350ccd348 --- /dev/null +++ b/airflow/providers/google/cloud/sensors/dataform.py @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""This module contains a Google Cloud Dataform sensor.""" +from typing import TYPE_CHECKING, Iterable, Optional, Sequence, Set, Union + +from airflow.exceptions import AirflowException +from airflow.providers.google.cloud.hooks.dataform import DataformHook +from airflow.sensors.base import BaseSensorOperator + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class DataformWorkflowInvocationStateSensor(BaseSensorOperator): + """ + Checks for the status of a Workflow Invocation in Google Cloud Dataform. + + :param project_id: Required, the Google Cloud project ID in which to start a job. + If set to None or missing, the default project_id from the Google Cloud connection is used. + :param region: Required, The location of the Dataform workflow invocation (for example europe-west1). + :param repository_id: Required. The ID of the Dataform repository that the task belongs to. + :param workflow_invocation_id: Required, ID of the workflow invocation to be checked. + :param expected_statuses: The expected state of the operation. + See: + https://cloud.google.com/python/docs/reference/dataform/latest/google.cloud.dataform_v1beta1.types.WorkflowInvocation.State + :param failure_statuses: State that will terminate the sensor with an exception + :param gcp_conn_id: The connection ID to use connecting to Google Cloud. + :param delegate_to: The account to impersonate using domain-wide delegation of authority, + if any. For this to work, the service account making the request must have + domain-wide delegation enabled. See: + https://developers.google.com/identity/protocols/oauth2/service-account#delegatingauthority + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = ('workflow_invocation_id',) + + def __init__( + self, + *, + project_id: str, + region: str, + repository_id: str, + workflow_invocation_id: str, + expected_statuses: Union[Set[int], int], + failure_statuses: Optional[Iterable[int]] = None, + gcp_conn_id: str = 'google_cloud_default', + delegate_to: Optional[str] = None, + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.repository_id = repository_id + self.workflow_invocation_id = workflow_invocation_id + self.expected_statuses = ( + {expected_statuses} if isinstance(expected_statuses, int) else expected_statuses + ) + self.failure_statuses = failure_statuses + self.project_id = project_id + self.region = region + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.impersonation_chain = impersonation_chain + self.hook: Optional[DataformHook] = None + + def poke(self, context: 'Context') -> bool: + self.hook = DataformHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + impersonation_chain=self.impersonation_chain, + ) + + workflow_invocation = self.hook.get_workflow_invocation( + project_id=self.project_id, + region=self.region, + repository_id=self.repository_id, + workflow_invocation_id=self.workflow_invocation_id, + ) + workflow_status = workflow_invocation.state + if workflow_status is not None: + if self.failure_statuses and workflow_status in self.failure_statuses: + raise AirflowException( + f"Workflow Invocation with id '{self.workflow_invocation_id}' " + f"state is: {workflow_status}. Terminating sensor..." + ) + + return workflow_status in self.expected_statuses diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index c3589140aadbb..c4ba4de716348 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -74,6 +74,7 @@ dependencies: - google-cloud-bigtable>=1.0.0,<2.0.0 - google-cloud-build>=3.0.0 - google-cloud-container>=2.2.0,<3.0.0 + - google-cloud-dataform>=0.2.0 - google-cloud-datacatalog>=3.0.0 - google-cloud-dataplex>=0.1.0 - google-cloud-dataproc>=3.1.0 @@ -157,6 +158,11 @@ integrations: how-to-guide: - /docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst tags: [google] + - integration-name: Google Cloud Dataform + external-doc-url: https://cloud.google.com/dataform/ + how-to-guide: + - /docs/apache-airflow-providers-google/operators/cloud/dataform.rst + tags: [google] - integration-name: Google Cloud Data Loss Prevention (DLP) external-doc-url: https://cloud.google.com/dlp/ how-to-guide: @@ -567,6 +573,9 @@ operators: - integration-name: Google Looker python-modules: - airflow.providers.google.cloud.operators.looker + - integration-name: Google Cloud Dataform + python-modules: + - airflow.providers.google.cloud.operators.dataform sensors: - integration-name: Google BigQuery @@ -617,6 +626,9 @@ sensors: - integration-name: Google Looker python-modules: - airflow.providers.google.cloud.sensors.looker + - integration-name: Google Cloud Dataform + python-modules: + - airflow.providers.google.cloud.sensors.dataform hooks: - integration-name: Google Ads @@ -785,6 +797,9 @@ hooks: - integration-name: Google Looker python-modules: - airflow.providers.google.cloud.hooks.looker + - integration-name: Google Cloud Dataform + python-modules: + - airflow.providers.google.cloud.hooks.dataform transfers: - source-integration-name: Presto @@ -930,6 +945,7 @@ extra-links: - airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink - airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink - airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink + - airflow.providers.google.cloud.links.dataform.DataformWorkflowInvocationLink - airflow.providers.google.cloud.operators.datafusion.DataFusionInstanceLink - airflow.providers.google.cloud.operators.datafusion.DataFusionPipelineLink - airflow.providers.google.cloud.operators.datafusion.DataFusionPipelinesLink diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataform.rst b/docs/apache-airflow-providers-google/operators/cloud/dataform.rst new file mode 100644 index 0000000000000..de9e5871a0a91 --- /dev/null +++ b/docs/apache-airflow-providers-google/operators/cloud/dataform.rst @@ -0,0 +1,110 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Google Dataform Operators +========================= + +Dataform is a service for data analysts to develop, test, version control, and schedule complex SQL +workflows for data transformation in BigQuery. + +Dataform lets you manage data transformation in the Extraction, Loading, and Transformation (ELT) process +for data integration. After raw data is extracted from source systems and loaded into BigQuery, Dataform +helps you to transform it into a well-defined, tested, and documented suite of data tables. + +For more information about the task visit `Dataform production documentation `__ + + +Configuration +------------- + +Before you can use the Dataform operators you need to initialize repository and workspace, for more information +about this visit `Dataform Production documentation `__ + + +Create Compilation Result +------------------------- +A simple configuration to create Compilation Result can look as followed: + +:class:`~airflow.providers.google.cloud.operators.dataform.DataformCreateCompilationResultOperator` + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataform/example_dataform.py + :language: python + :dedent: 0 + :start-after: [START howto_operator_create_compilation_result] + :end-before: [END howto_operator_create_compilation_result] + +Get Compilation Result +---------------------- + +To get a Compilation Result you can use: + +:class:`~airflow.providers.google.cloud.operators.dataform.DataformGetCompilationResultOperator` + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataform/example_dataform.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_get_compilation_result] + :end-before: [END howto_operator_get_compilation_result] + +Create Workflow Invocation +-------------------------- + +To create a Workflow Invocation you can use: + +:class:`~airflow.providers.google.cloud.operators.dataform.DataformCreateWorkflowInvocationOperator` + +We have possibility to run this operation in the sync mode and async, for async operation we also have +a sensor: +:class:`~airflow.providers.google.cloud.operators.dataform.DataformWorkflowInvocationStateSensor` + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataform/example_dataform.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_create_workflow_invocation] + :end-before: [END howto_operator_create_workflow_invocation] + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataform/example_dataform.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_create_workflow_invocation_async] + :end-before: [END howto_operator_create_workflow_invocation_async] + +Get Workflow Invocation +----------------------- + +To get a Workflow Invocation you can use: + +:class:`~airflow.providers.google.cloud.operators.dataform.DataformGetWorkflowInvocationOperator` + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataform/example_dataform.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_get_workflow_invocation] + :end-before: [END howto_operator_get_workflow_invocation] + +Cancel Workflow Invocation +-------------------------- + +To cancel a Workflow Invocation you can use: + +:class:`~airflow.providers.google.cloud.sensors.dataform.DataformCancelWorkflowInvocationOperator` + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataform/example_dataform.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_cancel_workflow_invocation] + :end-before: [END howto_operator_cancel_workflow_invocation] diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index cb0efadb5c82c..868e3134bba34 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -99,6 +99,7 @@ DataTransferServiceClient Databricks Datadog Dataflow +Dataform Dataframe Datalake Datanodes diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index dd06841df925c..e65ee42833e5b 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -306,6 +306,7 @@ "google-cloud-build>=3.0.0", "google-cloud-container>=2.2.0,<3.0.0", "google-cloud-datacatalog>=3.0.0", + "google-cloud-dataform>=0.2.0", "google-cloud-dataplex>=0.1.0", "google-cloud-dataproc-metastore>=1.2.0,<2.0.0", "google-cloud-dataproc>=3.1.0", diff --git a/tests/providers/google/cloud/hooks/test_dataform.py b/tests/providers/google/cloud/hooks/test_dataform.py new file mode 100644 index 0000000000000..4312b550e64d8 --- /dev/null +++ b/tests/providers/google/cloud/hooks/test_dataform.py @@ -0,0 +1,157 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import unittest +from unittest import mock + +from google.api_core.gapic_v1.method import DEFAULT + +from airflow.providers.google.cloud.hooks.dataform import DataformHook +from tests.providers.google.cloud.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id + +BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}" +DATAFORM_STRING = "airflow.providers.google.cloud.hooks.dataform.{}" + +PROJECT_ID = "project-id" +REGION = "region" +REPOSITORY_ID = "test_repository" +WORKSPACE_ID = "test_workspace" +GCP_CONN_ID = "google_cloud_default" +DELEGATE_TO = "test-delegate-to" +IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"] +COMPILATION_RESULT = { + "git_commitish": "main", + "workspace": ( + f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/workspaces/{WORKSPACE_ID}" + ), +} +COMPILATION_RESULT_ID = "test_compilation_result_id" +WORKFLOW_INVOCATION = { + "compilation_result": ( + f"projects/{PROJECT_ID}/locations/{REGION}/repositories/" + f"{REPOSITORY_ID}/compilationResults/{COMPILATION_RESULT_ID}" + ), +} +WORKFLOW_INVOCATION_ID = "test_workflow_invocation_id" + + +class TestDataflowHook(unittest.TestCase): + def setUp(self): + with mock.patch( + BASE_STRING.format("GoogleBaseHook.__init__"), + new=mock_base_gcp_hook_default_project_id, + ): + self.hook = DataformHook( + gcp_conn_id=GCP_CONN_ID, + delegate_to=DELEGATE_TO, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + @mock.patch(DATAFORM_STRING.format("DataformHook.get_dataform_client")) + def test_create_compilation_result(self, mock_client): + self.hook.create_compilation_result( + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + compilation_result=COMPILATION_RESULT, + ) + parent = f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}" + mock_client.return_value.create_compilation_result.assert_called_once_with( + request=dict(parent=parent, compilation_result=COMPILATION_RESULT), + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + @mock.patch(DATAFORM_STRING.format("DataformHook.get_compilation_result")) + def get_compilation_result(self, mock_client): + self.hook.create_compilation_result( + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + ) + name = ( + f"projects/{PROJECT_ID}/locations/{REGION}/repositories/" + f"{REPOSITORY_ID}/compilationResults/{COMPILATION_RESULT_ID}" + ) + mock_client.return_value.get_compilation_result.assert_called_once_with( + request=dict( + name=name, + ), + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + @mock.patch(DATAFORM_STRING.format("DataformHook.get_dataform_client")) + def test_create_workflow_invocation(self, mock_client): + self.hook.create_workflow_invocation( + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + workflow_invocation=WORKFLOW_INVOCATION, + ) + parent = f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}" + mock_client.return_value.create_workflow_invocation.assert_called_once_with( + request=dict(parent=parent, workflow_invocation=WORKFLOW_INVOCATION), + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + @mock.patch(DATAFORM_STRING.format("DataformHook.get_dataform_client")) + def test_get_workflow_invocation(self, mock_client): + self.hook.get_workflow_invocation( + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + workflow_invocation_id=WORKFLOW_INVOCATION_ID, + ) + name = ( + f"projects/{PROJECT_ID}/locations/{REGION}/repositories/" + f"{REPOSITORY_ID}/workflowInvocations/{WORKFLOW_INVOCATION_ID}" + ) + mock_client.return_value.get_workflow_invocation.assert_called_once_with( + request=dict( + name=name, + ), + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + @mock.patch(DATAFORM_STRING.format("DataformHook.get_dataform_client")) + def test_cancel_workflow_invocation(self, mock_client): + self.hook.cancel_workflow_invocation( + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + workflow_invocation_id=WORKFLOW_INVOCATION_ID, + ) + name = ( + f"projects/{PROJECT_ID}/locations/{REGION}/repositories/" + f"{REPOSITORY_ID}/workflowInvocations/{WORKFLOW_INVOCATION_ID}" + ) + mock_client.return_value.cancel_workflow_invocation.assert_called_once_with( + request=dict( + name=name, + ), + retry=DEFAULT, + timeout=None, + metadata=(), + ) diff --git a/tests/providers/google/cloud/operators/test_dataform.py b/tests/providers/google/cloud/operators/test_dataform.py new file mode 100644 index 0000000000000..e3d4c1c22cc31 --- /dev/null +++ b/tests/providers/google/cloud/operators/test_dataform.py @@ -0,0 +1,176 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from unittest import TestCase, mock + +from google.api_core.gapic_v1.method import DEFAULT + +from airflow.providers.google.cloud.operators.dataform import ( + DataformCancelWorkflowInvocationOperator, + DataformCreateCompilationResultOperator, + DataformCreateWorkflowInvocationOperator, + DataformGetCompilationResultOperator, + DataformGetWorkflowInvocationOperator, +) + +HOOK_STR = "airflow.providers.google.cloud.operators.dataform.DataformHook" +WORKFLOW_INVOCATION_STR = "airflow.providers.google.cloud.operators.dataform.WorkflowInvocation" +COMPILATION_RESULT_STR = "airflow.providers.google.cloud.operators.dataform.CompilationResult" + +PROJECT_ID = "project-id" +REGION = "region" +REPOSITORY_ID = "test_repository_id" +WORKSPACE_ID = "test_workspace_id" +WORKSPACE = f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/workspaces/{WORKSPACE_ID}" +COMPILATION_RESULT_ID = "test_compilation_result_id" +GCP_CONN_ID = "google_cloud_default" +DELEGATE_TO = "test-delegate-to" +IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"] + +WORKFLOW_INVOCATION = { + "compilation_result": ( + f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/" + f"compilationResults/{COMPILATION_RESULT_ID}" + ) +} +WORKFLOW_INVOCATION_ID = "test_workflow_invocation" + + +class TestDataformCreateCompilationResult(unittest.TestCase): + @mock.patch(HOOK_STR) + @mock.patch(COMPILATION_RESULT_STR) + def test_execute(self, compilation_result_mock, hook_mock): + compilation_result = { + "git_commitish": "main", + "workspace": WORKSPACE, + } + op = DataformCreateCompilationResultOperator( + task_id="create_compilation_result", + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + compilation_result=compilation_result, + ) + compilation_result_mock.return_value.to_dict.return_value = None + op.execute(context=mock.MagicMock()) + hook_mock.return_value.create_compilation_result.assert_called_once_with( + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + compilation_result=compilation_result, + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + +class TestDataformGetCompilationResultOperator(TestCase): + @mock.patch(HOOK_STR) + @mock.patch(COMPILATION_RESULT_STR) + def test_execute(self, compilation_result_mock, hook_mock): + op = DataformGetCompilationResultOperator( + task_id="get_compilation_result", + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + compilation_result_id=COMPILATION_RESULT_ID, + ) + compilation_result_mock.return_value.to_dict.return_value = None + op.execute(context=mock.MagicMock()) + hook_mock.return_value.get_compilation_result.assert_called_once_with( + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + compilation_result_id=COMPILATION_RESULT_ID, + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + +class TestDataformCreateWorkflowInvocationOperator(TestCase): + @mock.patch(HOOK_STR) + @mock.patch(WORKFLOW_INVOCATION_STR) + def test_execute(self, workflow_invocation_str, hook_mock): + op = DataformCreateWorkflowInvocationOperator( + task_id="create_workflow_invocation", + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + workflow_invocation=WORKFLOW_INVOCATION, + ) + workflow_invocation_str.return_value.to_dict.return_value = None + op.execute(context=mock.MagicMock()) + + hook_mock.return_value.create_workflow_invocation.assert_called_once_with( + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + workflow_invocation=WORKFLOW_INVOCATION, + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + +class TestDataformGetWorkflowInvocationOperator(TestCase): + @mock.patch(HOOK_STR) + @mock.patch(WORKFLOW_INVOCATION_STR) + def test_execute(self, workflow_invocation_str, hook_mock): + op = DataformGetWorkflowInvocationOperator( + task_id="get_workflow_invocation", + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + workflow_invocation_id=WORKFLOW_INVOCATION_ID, + ) + + workflow_invocation_str.return_value.to_dict.return_value = None + op.execute(context=mock.MagicMock()) + + hook_mock.return_value.get_workflow_invocation.assert_called_once_with( + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + workflow_invocation_id=WORKFLOW_INVOCATION_ID, + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + +class TestDataformCancelWorkflowInvocationOperator(TestCase): + @mock.patch(HOOK_STR) + def test_execute(self, hook_mock): + op = DataformCancelWorkflowInvocationOperator( + task_id="cancel_workflow_invocation", + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + workflow_invocation_id=WORKFLOW_INVOCATION_ID, + ) + op.execute(context=mock.MagicMock()) + hook_mock.return_value.cancel_workflow_invocation.assert_called_once_with( + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + workflow_invocation_id=WORKFLOW_INVOCATION_ID, + retry=DEFAULT, + timeout=None, + metadata=(), + ) diff --git a/tests/system/providers/google/cloud/dataform/__init__.py b/tests/system/providers/google/cloud/dataform/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/system/providers/google/cloud/dataform/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/system/providers/google/cloud/dataform/example_dataform.py b/tests/system/providers/google/cloud/dataform/example_dataform.py new file mode 100644 index 0000000000000..d07d42898e997 --- /dev/null +++ b/tests/system/providers/google/cloud/dataform/example_dataform.py @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG for Google Cloud Dataform service +""" +import os +from datetime import datetime + +from google.cloud.dataform_v1beta1 import WorkflowInvocation + +from airflow import models +from airflow.models.baseoperator import chain +from airflow.providers.google.cloud.operators.dataform import ( + DataformCancelWorkflowInvocationOperator, + DataformCreateCompilationResultOperator, + DataformCreateWorkflowInvocationOperator, + DataformGetCompilationResultOperator, + DataformGetWorkflowInvocationOperator, +) +from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "test-project-id") + +DAG_ID = "dataform" + +REPOSITORY_ID = "dataform-test2" +REGION = "us-central1" +WORKSPACE_ID = "testing" + +# This DAG is not self-run we need to do some extra configuration to execute it in automation process + +with models.DAG( + DAG_ID, + schedule_interval='@once', # Override to match your needs + start_date=datetime(2021, 1, 1), + catchup=False, + tags=['example', 'dataform'], +) as dag: + # [START howto_operator_create_compilation_result] + create_compilation_result = DataformCreateCompilationResultOperator( + task_id="create_compilation_result", + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + compilation_result={ + "git_commitish": "main", + "workspace": ( + f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/" + f"workspaces/{WORKSPACE_ID}" + ), + }, + ) + # [END howto_operator_create_compilation_result] + + # [START howto_operator_get_compilation_result] + get_compilation_result = DataformGetCompilationResultOperator( + task_id="get_compilation_result", + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + compilation_result_id=( + "{{ task_instance.xcom_pull('create_compilation_result')['name'].split('/')[-1] }}" + ), + ) + # [END howto_operator_get_compilation_result]] + + # [START howto_operator_create_workflow_invocation] + create_workflow_invocation = DataformCreateWorkflowInvocationOperator( + task_id='create_workflow_invocation', + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + workflow_invocation={ + "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}" + }, + ) + # [END howto_operator_create_workflow_invocation] + + # [START howto_operator_create_workflow_invocation_async] + create_workflow_invocation_async = DataformCreateWorkflowInvocationOperator( + task_id='create_workflow_invocation_async', + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + asynchronous=True, + workflow_invocation={ + "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}" + }, + ) + + is_workflow_invocation_done = DataformWorkflowInvocationStateSensor( + task_id="is_workflow_invocation_done", + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + workflow_invocation_id=( + "{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}" + ), + expected_statuses={WorkflowInvocation.State.SUCCEEDED}, + ) + # [END howto_operator_create_workflow_invocation_async] + + # [START howto_operator_get_workflow_invocation] + get_workflow_invocation = DataformGetWorkflowInvocationOperator( + task_id='get_workflow_invocation', + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + workflow_invocation_id=( + "{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}" + ), + ) + # [END howto_operator_get_workflow_invocation] + + create_second_workflow_invocation = DataformCreateWorkflowInvocationOperator( + task_id='create_second_workflow_invocation', + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + workflow_invocation={ + "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}" + }, + ) + + # [START howto_operator_cancel_workflow_invocation] + cancel_workflow_invocation = DataformCancelWorkflowInvocationOperator( + task_id='cancel_workflow_incoation', + project_id=PROJECT_ID, + region=REGION, + repository_id=REPOSITORY_ID, + workflow_invocation_id=( + "{{ task_instance.xcom_pull('create_second_workflow_invocation')['name'].split('/')[-1] }}" + ), + ) + # [END howto_operator_cancel_workflow_invocation] + + chain( + create_compilation_result, + get_compilation_result, + create_workflow_invocation >> get_workflow_invocation, + create_workflow_invocation_async >> is_workflow_invocation_done, + create_second_workflow_invocation >> cancel_workflow_invocation, + ) + + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)