|
16 | 16 | # specific language governing permissions and limitations
|
17 | 17 | # under the License.
|
18 | 18 | """
|
19 |
| -Example Airflow DAG for Google Cloud Storage GCSObjectExistenceSensor and |
20 |
| -GCSObjectsWithPrefixExistenceSensor sensors. |
| 19 | +Example Airflow DAG for Google Cloud Storage sensors. |
21 | 20 | """
|
22 | 21 |
|
23 | 22 | import os
|
|
26 | 25 |
|
27 | 26 | from airflow import models
|
28 | 27 | from airflow.models.baseoperator import chain
|
| 28 | +from airflow.operators.bash import BashOperator |
29 | 29 | from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
|
30 | 30 | from airflow.providers.google.cloud.sensors.gcs import (
|
31 | 31 | GCSObjectExistenceSensor,
|
32 | 32 | GCSObjectsWithPrefixExistenceSensor,
|
| 33 | + GCSObjectUpdateSensor, |
| 34 | + GCSUploadSessionCompleteSensor, |
33 | 35 | )
|
34 | 36 | from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
|
35 | 37 | from airflow.utils.trigger_rule import TriggerRule
|
|
55 | 57 | task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
|
56 | 58 | )
|
57 | 59 |
|
| 60 | + # [START howto_sensor_gcs_upload_session_complete_task] |
| 61 | + gcs_upload_session_complete = GCSUploadSessionCompleteSensor( |
| 62 | + bucket=BUCKET_NAME, |
| 63 | + prefix=FILE_NAME, |
| 64 | + inactivity_period=15, |
| 65 | + min_objects=1, |
| 66 | + allow_delete=True, |
| 67 | + previous_objects=set(), |
| 68 | + task_id="gcs_upload_session_complete_task", |
| 69 | + ) |
| 70 | + # [END howto_sensor_gcs_upload_session_complete_task] |
| 71 | + |
| 72 | + # [START howto_sensor_object_update_exists_task] |
| 73 | + gcs_update_object_exists = GCSObjectUpdateSensor( |
| 74 | + bucket=BUCKET_NAME, |
| 75 | + object=FILE_NAME, |
| 76 | + task_id="gcs_object_update_sensor_task", |
| 77 | + ) |
| 78 | + # [END howto_sensor_object_update_exists_task] |
| 79 | + |
58 | 80 | upload_file = LocalFilesystemToGCSOperator(
|
59 | 81 | task_id="upload_file",
|
60 | 82 | src=UPLOAD_FILE_PATH,
|
|
84 | 106 | task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
|
85 | 107 | )
|
86 | 108 |
|
| 109 | + sleep = BashOperator(task_id='sleep', bash_command='sleep 5') |
| 110 | + |
87 | 111 | chain(
|
88 | 112 | # TEST SETUP
|
89 | 113 | create_bucket,
|
| 114 | + sleep, |
90 | 115 | upload_file,
|
91 | 116 | # TEST BODY
|
92 | 117 | [gcs_object_exists, gcs_object_with_prefix_exists],
|
93 | 118 | # TEST TEARDOWN
|
94 | 119 | delete_bucket,
|
95 | 120 | )
|
| 121 | + chain( |
| 122 | + create_bucket, |
| 123 | + # TEST BODY |
| 124 | + gcs_upload_session_complete, |
| 125 | + gcs_update_object_exists, |
| 126 | + delete_bucket, |
| 127 | + ) |
96 | 128 |
|
97 | 129 | from tests.system.utils.watcher import watcher
|
98 | 130 |
|
|
0 commit comments