Skip to content

Conversation

luisglft
Copy link

@luisglft luisglft commented Jul 9, 2024

This PR implements a new callback on_state_change_callback that will be called every time a tasks state is updated by airflow. This will be used together with task_lifecycle to record these events.

The previous implementation was based on the existing airflow callbacks:

  1. on_success_callback
  2. on_failure_callback
  3. sla_miss_callback
  4. on_retry_callback
  5. on_execute_callback
  6. on_skipped_callback

However these are not enough to catch all state transitions. With this new callback we should be able to only listen to it, which includes all the information of the TaskInstance, including the latest state.

This callback gets executed in multiple places inside Airflow.

on_state_change_callback calls:

taskinstance.py

State -- Method

  • SUCCESS -- _run_finished_callback
  • RUNNING -- check_and_change_state_before_execution,
  • FAILED -- _run_finished_callback, set_state
  • QUEUED -- _process_executor_events
  • SHUTDOWN -- nowhere
  • RESTARTING -- clear_task_instances
  • UP_FOR_RETRY -- _run_finished_callback
  • UP_FOR_RESCHEDULE -- check_and_change_state_before_execution
  • UPSTREAM_FAILED -- set_state
  • SKIPPED -- set_state
  • SENSING -- _update_ti_state_for_sensing
  • DEFERRED -- _defer_task

dagrun.py

State -- Method

  • NONE -- _check_for_removed_or_restored_tasks
  • REMOVED -- _check_for_removed_or_restored_tasks
  • SCHEDULED --- schedule_tis

@luisglft
Copy link
Author

🔒

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants