Skip to content

Commit 4d724b1

Browse files
Allow downgrades to Airflow 2 without FAB provider (#55231)
We don't "always" need the FAB provider, we just need it if we don't have the db at the version we expect. This can be because 1) we upgraded from AF2 and the tables still exist or 2) the instance had the FAB provider in the past but doesn't any longer. In either case though, we have the tables, and at the version we expect anyway, so no need to require the FAB provider to be installed before the downgrade.
1 parent ed3d3e1 commit 4d724b1

File tree

1 file changed

+58
-16
lines changed
  • airflow-core/src/airflow/utils

1 file changed

+58
-16
lines changed

airflow-core/src/airflow/utils/db.py

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1202,23 +1202,9 @@ def downgrade(*, to_revision, from_revision=None, show_sql_only=False, session:
12021202

12031203
log.info("Attempting downgrade to revision %s", to_revision)
12041204
config = _get_alembic_config()
1205-
# Check if downgrade is less than 3.0.0 and requires that `ab_user` fab table is present
1205+
# If downgrading to less than 3.0.0, we need to handle the FAB provider
12061206
if _revision_greater(config, _REVISION_HEADS_MAP["2.10.3"], to_revision):
1207-
try:
1208-
from airflow.providers.fab.auth_manager.models.db import FABDBManager
1209-
except ImportError:
1210-
# Raise the error with a new message
1211-
raise RuntimeError(
1212-
"Import error occurred while importing FABDBManager. We need that to exist before we can "
1213-
"downgrade to <3.0.0"
1214-
)
1215-
dbm = FABDBManager(session)
1216-
if hasattr(dbm, "reset_to_2_x"):
1217-
dbm.reset_to_2_x()
1218-
else:
1219-
# Older version before we added that function, it only has a single migration so we can just
1220-
# created
1221-
dbm.create_db_from_orm()
1207+
_handle_fab_downgrade(session=session)
12221208
with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
12231209
if show_sql_only:
12241210
log.warning("Generating sql scripts for manual migration.")
@@ -1231,6 +1217,62 @@ def downgrade(*, to_revision, from_revision=None, show_sql_only=False, session:
12311217
command.downgrade(config, revision=to_revision, sql=show_sql_only)
12321218

12331219

1220+
def _get_fab_migration_version(*, session: Session) -> str | None:
1221+
"""
1222+
Get the current FAB migration version from the database.
1223+
1224+
This intentionally queries the db directly, as the FAB provider and FABDBManager may not even be installed.
1225+
1226+
:param session: sqlalchemy session for connection to airflow metadata database
1227+
:return: The current FAB migration revision, or None if not found
1228+
"""
1229+
try:
1230+
result = session.execute(text("SELECT version_num FROM alembic_version_fab LIMIT 1"))
1231+
row = result.fetchone()
1232+
return row[0] if row else None
1233+
except Exception:
1234+
# Table might not exist or other database error
1235+
return None
1236+
1237+
1238+
def _handle_fab_downgrade(*, session: Session) -> None:
1239+
"""
1240+
Handle FAB downgrade requirements for downgrades to Airflow versions < 3.0.0.
1241+
1242+
First, checks if the FAB db version matches the known version from 1.4.0.
1243+
If it matches, no FAB db tables need to be touched.
1244+
Otherwise, imports the FABDBManager and calls its downgrade method.
1245+
1246+
:param session: sqlalchemy session for connection to airflow metadata database
1247+
:raises RuntimeError: If FAB provider is required but cannot be imported
1248+
"""
1249+
fab_version = _get_fab_migration_version(session=session)
1250+
if fab_version == "6709f7a774b9": # 1.4.0
1251+
# FAB version matches - we can proceed without touching the FAB db tables
1252+
log.info(
1253+
"FAB migration version %s matches known version from 1.4.0. "
1254+
"FAB provider is not required for downgrade.",
1255+
fab_version,
1256+
)
1257+
return
1258+
1259+
# FAB db version is different or not found - require the FAB provider
1260+
try:
1261+
from airflow.providers.fab.auth_manager.models.db import FABDBManager
1262+
except ImportError:
1263+
raise RuntimeError(
1264+
"Import error occurred while importing FABDBManager. The apache-airflow-provider-fab package must be installed before we can "
1265+
"downgrade to <3.0.0."
1266+
)
1267+
dbm = FABDBManager(session)
1268+
if hasattr(dbm, "reset_to_2_x"):
1269+
dbm.reset_to_2_x()
1270+
else:
1271+
# Older version before we added that function, it only has a single migration so we can just create the tables
1272+
# to ensure they are there
1273+
dbm.create_db_from_orm()
1274+
1275+
12341276
def drop_airflow_models(connection):
12351277
"""
12361278
Drop all airflow models.

0 commit comments

Comments
 (0)