3535 a table in the airflow metadata database
3636 - age_check_column: Column in the model/table to use for calculating max
3737 date of data deletion
38- - keep_last: Boolean to specify whether to preserve last run instance
39- - keep_last_filters: List of filters to preserve data from deleting
40- during clean-up, such as DAG runs where the external trigger is set
41- to 0.
42- - keep_last_group_by: Option to specify column by which to group the
43- database entries and perform aggregate functions.
4438
45393. Create and Set the following Variables in the Airflow Web Server
4640 (Admin -> Variables)
7165from airflow .version import version as airflow_version
7266
7367import dateutil .parser
74- from sqlalchemy import and_ , func ,text
68+ from sqlalchemy import desc , sql ,text
7569from sqlalchemy .exc import ProgrammingError
7670
7771now = timezone .utcnow
@@ -282,7 +276,9 @@ def print_configuration_function(**context):
282276logging .info ("dag_run.conf: " + str (dag_run_conf ))
283277max_db_entry_age_in_days = None
284278if dag_run_conf :
285- max_db_entry_age_in_days = dag_run_conf .get ("maxDBEntryAgeInDays" ,None )
279+ max_db_entry_age_in_days = dag_run_conf .get (
280+ "maxDBEntryAgeInDays" ,None
281+ )
286282logging .info ("maxDBEntryAgeInDays from dag_run.conf: " + str (dag_run_conf ))
287283if max_db_entry_age_in_days is None or max_db_entry_age_in_days < 1 :
288284logging .info (
@@ -319,37 +315,51 @@ def build_query(
319315airflow_db_model ,
320316age_check_column ,
321317max_date ,
322- keep_last ,
323- keep_last_filters = None ,
324- keep_last_group_by = None ,
318+ dag_id = None
325319):
320+ """
321+ Build a database query to retrieve and filter Airflow data.
322+
323+ Args:
324+ session: SQLAlchemy session object for database interaction.
325+ airflow_db_model: The Airflow model class to query (e.g., DagRun).
326+ age_check_column: The column representing the age of the data.
327+ max_date: The maximum allowed age for the data.
328+ dag_id (optional): The ID of the DAG to filter by. Defaults to None.
329+
330+ Returns:
331+ SQLAlchemy query object: The constructed query.
332+ """
326333query = session .query (airflow_db_model )
327334
328335logging .info ("INITIAL QUERY : " + str (query ))
329336
330- if not keep_last :
331- query = query .filter (
332- age_check_column <= max_date ,
337+ if dag_id :
338+ query = query .filter (airflow_db_model .dag_id == dag_id )
339+
340+ if airflow_db_model == DagRun :
341+ # For DaRus we want to leave last DagRun regardless of its age
342+ newest_dagrun = (
343+ session
344+ .query (airflow_db_model )
345+ .filter (airflow_db_model .dag_id == dag_id )
346+ .order_by (desc (airflow_db_model .execution_date ))
347+ .first ()
333348 )
349+ logging .info ("Newest dagrun: " + str (newest_dagrun ))
350+ if newest_dagrun is not None :
351+ query = (
352+ query
353+ .filter (DagRun .external_trigger .is_ (False ))
354+ .filter (age_check_column <= max_date )
355+ .filter (airflow_db_model .id != newest_dagrun .id )
356+ )
357+ else :
358+ query = query .filter (sql .false ())
334359else :
335- subquery = session .query (func .max (DagRun .execution_date ))
336- # workaround for MySQL "table specified twice" issue
337- # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
338- if keep_last_filters is not None :
339- for entry in keep_last_filters :
340- subquery = subquery .filter (entry )
341-
342- logging .info ("SUB QUERY [keep_last_filters]: " + str (subquery ))
343-
344- if keep_last_group_by is not None :
345- subquery = subquery .group_by (keep_last_group_by )
346- logging .info ("SUB QUERY [keep_last_group_by]: " + str (subquery ))
360+ query = query .filter (age_check_column <= max_date )
347361
348- subquery = subquery .from_self ()
349-
350- query = query .filter (
351- and_ (age_check_column .notin_ (subquery )),and_ (age_check_column <= max_date )
352- )
362+ logging .info ("FINAL QUERY: " + str (query ))
353363
354364return query
355365
@@ -410,13 +420,10 @@ def cleanup_function(**context):
410420try :
411421if context ["params" ].get ("do_not_delete_by_dag_id" ):
412422query = build_query (
413- session ,
414- airflow_db_model ,
415- age_check_column ,
416- max_date ,
417- keep_last ,
418- keep_last_filters ,
419- keep_last_group_by ,
423+ session = session ,
424+ airflow_db_model = airflow_db_model ,
425+ age_check_column = age_check_column ,
426+ max_date = max_date ,
420427 )
421428if PRINT_DELETES :
422429print_query (query ,airflow_db_model ,age_check_column )
@@ -429,17 +436,14 @@ def cleanup_function(**context):
429436session .commit ()
430437
431438list_dags = [str (list (dag )[0 ])for dag in dags ]+ [None ]
432- for dag in list_dags :
439+ for dag_id in list_dags :
433440query = build_query (
434- session ,
435- airflow_db_model ,
436- age_check_column ,
437- max_date ,
438- keep_last ,
439- keep_last_filters ,
440- keep_last_group_by ,
441+ session = session ,
442+ airflow_db_model = airflow_db_model ,
443+ age_check_column = age_check_column ,
444+ max_date = max_date ,
445+ dag_id = dag_id ,
441446 )
442- query = query .filter (airflow_db_model .dag_id == dag )
443447if PRINT_DELETES :
444448print_query (query ,airflow_db_model ,age_check_column )
445449if ENABLE_DELETE :
@@ -448,7 +452,7 @@ def cleanup_function(**context):
448452session .commit ()
449453
450454if not ENABLE_DELETE :
451- logging .warn (
455+ logging .warning (
452456"You've opted to skip deleting the db entries. "
453457"Set ENABLE_DELETE to True to delete entries!!!"
454458 )
@@ -458,7 +462,9 @@ def cleanup_function(**context):
458462except ProgrammingError as e :
459463logging .error (e )
460464logging .error (
461- str (airflow_db_model )+ " is not present in the metadata." + "Skipping..."
465+ str (airflow_db_model )+
466+ " is not present in the metadata." +
467+ "Skipping..."
462468 )
463469
464470finally :
@@ -471,10 +477,15 @@ def cleanup_sessions():
471477try :
472478logging .info ("Deleting sessions..." )
473479count_statement = (
474- "SELECT COUNT(*) AS cnt FROM session WHERE expiry < now()::timestamp(0);"
480+ "SELECT COUNT(*) AS cnt FROM session " +
481+ "WHERE expiry < now()::timestamp(0);"
475482 )
476483before = session .execute (text (count_statement )).one_or_none ()["cnt" ]
477- session .execute (text ("DELETE FROM session WHERE expiry < now()::timestamp(0);" ))
484+ session .execute (
485+ text (
486+ "DELETE FROM session WHERE expiry < now()::timestamp(0);"
487+ )
488+ )
478489after = session .execute (text (count_statement )).one_or_none ()["cnt" ]
479490logging .info ("Deleted %s expired sessions." , (before - after ))
480491except Exception as err :
@@ -492,7 +503,10 @@ def analyze_db():
492503
493504
494505analyze_op = PythonOperator (
495- task_id = "analyze_query" ,python_callable = analyze_db ,provide_context = True ,dag = dag
506+ task_id = "analyze_query" ,
507+ python_callable = analyze_db ,
508+ provide_context = True ,
509+ dag = dag
496510)
497511
498512cleanup_session_op = PythonOperator (