Skip to content

Commit 5bb09f6

Browse files
author
Bartlomiej Hirsz
committed
Migrate GCS sensor examples
Change-Id: Iec9be2369a93f2b9bab62f6e5eef298492852c90
1 parent 4c08fa6 commit 5bb09f6

File tree

2 files changed

+34
-4
lines changed

2 files changed

+34
-4
lines changed

tests/always/test_project_structure.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase):
214214
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeleteDeidentifyTemplateOperator',
215215
'airflow.providers.google.cloud.operators.dlp.CloudDLPListDLPJobsOperator',
216216
'airflow.providers.google.cloud.operators.dlp.CloudDLPRedactImageOperator',
217-
'airflow.providers.google.cloud.sensors.gcs.GCSObjectUpdateSensor',
218-
'airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor',
219217
}
220218

221219
def test_missing_example_for_operator(self):

tests/system/providers/google/gcs/example_gcs_sensor.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
# specific language governing permissions and limitations
1717
# under the License.
1818
"""
19-
Example Airflow DAG for Google Cloud Storage GCSObjectExistenceSensor and
20-
GCSObjectsWithPrefixExistenceSensor sensors.
19+
Example Airflow DAG for Google Cloud Storage sensors.
2120
"""
2221

2322
import os
@@ -26,10 +25,13 @@
2625

2726
from airflow import models
2827
from airflow.models.baseoperator import chain
28+
from airflow.operators.bash import BashOperator
2929
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
3030
from airflow.providers.google.cloud.sensors.gcs import (
3131
GCSObjectExistenceSensor,
3232
GCSObjectsWithPrefixExistenceSensor,
33+
GCSObjectUpdateSensor,
34+
GCSUploadSessionCompleteSensor,
3335
)
3436
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
3537
from airflow.utils.trigger_rule import TriggerRule
@@ -55,6 +57,26 @@
5557
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
5658
)
5759

60+
# [START howto_sensor_gcs_upload_session_complete_task]
61+
gcs_upload_session_complete = GCSUploadSessionCompleteSensor(
62+
bucket=BUCKET_NAME,
63+
prefix=FILE_NAME,
64+
inactivity_period=15,
65+
min_objects=1,
66+
allow_delete=True,
67+
previous_objects=set(),
68+
task_id="gcs_upload_session_complete_task",
69+
)
70+
# [END howto_sensor_gcs_upload_session_complete_task]
71+
72+
# [START howto_sensor_object_update_exists_task]
73+
gcs_update_object_exists = GCSObjectUpdateSensor(
74+
bucket=BUCKET_NAME,
75+
object=FILE_NAME,
76+
task_id="gcs_object_update_sensor_task",
77+
)
78+
# [END howto_sensor_object_update_exists_task]
79+
5880
upload_file = LocalFilesystemToGCSOperator(
5981
task_id="upload_file",
6082
src=UPLOAD_FILE_PATH,
@@ -84,15 +106,25 @@
84106
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
85107
)
86108

109+
sleep = BashOperator(task_id='sleep', bash_command='sleep 5')
110+
87111
chain(
88112
# TEST SETUP
89113
create_bucket,
114+
sleep,
90115
upload_file,
91116
# TEST BODY
92117
[gcs_object_exists, gcs_object_with_prefix_exists],
93118
# TEST TEARDOWN
94119
delete_bucket,
95120
)
121+
chain(
122+
create_bucket,
123+
# TEST BODY
124+
gcs_upload_session_complete,
125+
gcs_update_object_exists,
126+
delete_bucket,
127+
)
96128

97129
from tests.system.utils.watcher import watcher
98130

0 commit comments

Comments
 (0)