@@ -1202,23 +1202,9 @@ def downgrade(*, to_revision, from_revision=None, show_sql_only=False, session:
1202
1202
1203
1203
log .info ("Attempting downgrade to revision %s" , to_revision )
1204
1204
config = _get_alembic_config ()
1205
- # Check if downgrade is less than 3.0.0 and requires that `ab_user` fab table is present
1205
+ # If downgrading to less than 3.0.0, we need to handle the FAB provider
1206
1206
if _revision_greater (config , _REVISION_HEADS_MAP ["2.10.3" ], to_revision ):
1207
- try :
1208
- from airflow .providers .fab .auth_manager .models .db import FABDBManager
1209
- except ImportError :
1210
- # Raise the error with a new message
1211
- raise RuntimeError (
1212
- "Import error occurred while importing FABDBManager. We need that to exist before we can "
1213
- "downgrade to <3.0.0"
1214
- )
1215
- dbm = FABDBManager (session )
1216
- if hasattr (dbm , "reset_to_2_x" ):
1217
- dbm .reset_to_2_x ()
1218
- else :
1219
- # Older version before we added that function, it only has a single migration so we can just
1220
- # created
1221
- dbm .create_db_from_orm ()
1207
+ _handle_fab_downgrade (session = session )
1222
1208
with create_global_lock (session = session , lock = DBLocks .MIGRATIONS ):
1223
1209
if show_sql_only :
1224
1210
log .warning ("Generating sql scripts for manual migration." )
@@ -1231,6 +1217,62 @@ def downgrade(*, to_revision, from_revision=None, show_sql_only=False, session:
1231
1217
command .downgrade (config , revision = to_revision , sql = show_sql_only )
1232
1218
1233
1219
1220
+ def _get_fab_migration_version (* , session : Session ) -> str | None :
1221
+ """
1222
+ Get the current FAB migration version from the database.
1223
+
1224
+ This intentionally queries the db directly, as the FAB provider and FABDBManager may not even be installed.
1225
+
1226
+ :param session: sqlalchemy session for connection to airflow metadata database
1227
+ :return: The current FAB migration revision, or None if not found
1228
+ """
1229
+ try :
1230
+ result = session .execute (text ("SELECT version_num FROM alembic_version_fab LIMIT 1" ))
1231
+ row = result .fetchone ()
1232
+ return row [0 ] if row else None
1233
+ except Exception :
1234
+ # Table might not exist or other database error
1235
+ return None
1236
+
1237
+
1238
+ def _handle_fab_downgrade (* , session : Session ) -> None :
1239
+ """
1240
+ Handle FAB downgrade requirements for downgrades to Airflow versions < 3.0.0.
1241
+
1242
+ First, checks if the FAB db version matches the known version from 1.4.0.
1243
+ If it matches, no FAB db tables need to be touched.
1244
+ Otherwise, imports the FABDBManager and calls its downgrade method.
1245
+
1246
+ :param session: sqlalchemy session for connection to airflow metadata database
1247
+ :raises RuntimeError: If FAB provider is required but cannot be imported
1248
+ """
1249
+ fab_version = _get_fab_migration_version (session = session )
1250
+ if fab_version == "6709f7a774b9" : # 1.4.0
1251
+ # FAB version matches - we can proceed without touching the FAB db tables
1252
+ log .info (
1253
+ "FAB migration version %s matches known version from 1.4.0. "
1254
+ "FAB provider is not required for downgrade." ,
1255
+ fab_version ,
1256
+ )
1257
+ return
1258
+
1259
+ # FAB db version is different or not found - require the FAB provider
1260
+ try :
1261
+ from airflow .providers .fab .auth_manager .models .db import FABDBManager
1262
+ except ImportError :
1263
+ raise RuntimeError (
1264
+ "Import error occurred while importing FABDBManager. The apache-airflow-provider-fab package must be installed before we can "
1265
+ "downgrade to <3.0.0."
1266
+ )
1267
+ dbm = FABDBManager (session )
1268
+ if hasattr (dbm , "reset_to_2_x" ):
1269
+ dbm .reset_to_2_x ()
1270
+ else :
1271
+ # Older version before we added that function, it only has a single migration so we can just create the tables
1272
+ # to ensure they are there
1273
+ dbm .create_db_from_orm ()
1274
+
1275
+
1234
1276
def drop_airflow_models (connection ):
1235
1277
"""
1236
1278
Drop all airflow models.
0 commit comments