Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbc8ac22

Browse files
authored
Update Maintenance DAG (GoogleCloudPlatform#12768)
* FIX: When removing DagRuns in previous version it might have skipped some DagRuns if scheduled time was identical for multiple DagRuns.* FIX: Linter.* FIX: Linter.* FIX: Logging.* Compare against dag_runs of current DAG only* Update airflow_db_cleanup.pyAdd None handling + make all conditions <=.* Linter update.* Fixing with review.* FIX: Add filter by dag_id for newest_dagrun + logs.* FIX: Linter.* FIX: Linter.
1 parent0fe057f commitbc8ac22

File tree

1 file changed

+66
-52
lines changed

1 file changed

+66
-52
lines changed

‎composer/workflows/airflow_db_cleanup.py‎

Lines changed: 66 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,6 @@
3535
a table in the airflow metadata database
3636
- age_check_column: Column in the model/table to use for calculating max
3737
date of data deletion
38-
- keep_last: Boolean to specify whether to preserve last run instance
39-
- keep_last_filters: List of filters to preserve data from deleting
40-
during clean-up, such as DAG runs where the external trigger is set
41-
to 0.
42-
- keep_last_group_by: Option to specify column by which to group the
43-
database entries and perform aggregate functions.
4438
4539
3. Create and Set the following Variables in the Airflow Web Server
4640
(Admin -> Variables)
@@ -71,7 +65,7 @@
7165
fromairflow.versionimportversionasairflow_version
7266

7367
importdateutil.parser
74-
fromsqlalchemyimportand_,func,text
68+
fromsqlalchemyimportdesc,sql,text
7569
fromsqlalchemy.excimportProgrammingError
7670

7771
now=timezone.utcnow
@@ -282,7 +276,9 @@ def print_configuration_function(**context):
282276
logging.info("dag_run.conf: "+str(dag_run_conf))
283277
max_db_entry_age_in_days=None
284278
ifdag_run_conf:
285-
max_db_entry_age_in_days=dag_run_conf.get("maxDBEntryAgeInDays",None)
279+
max_db_entry_age_in_days=dag_run_conf.get(
280+
"maxDBEntryAgeInDays",None
281+
)
286282
logging.info("maxDBEntryAgeInDays from dag_run.conf: "+str(dag_run_conf))
287283
ifmax_db_entry_age_in_daysisNoneormax_db_entry_age_in_days<1:
288284
logging.info(
@@ -319,37 +315,51 @@ def build_query(
319315
airflow_db_model,
320316
age_check_column,
321317
max_date,
322-
keep_last,
323-
keep_last_filters=None,
324-
keep_last_group_by=None,
318+
dag_id=None
325319
):
320+
"""
321+
Build a database query to retrieve and filter Airflow data.
322+
323+
Args:
324+
session: SQLAlchemy session object for database interaction.
325+
airflow_db_model: The Airflow model class to query (e.g., DagRun).
326+
age_check_column: The column representing the age of the data.
327+
max_date: The maximum allowed age for the data.
328+
dag_id (optional): The ID of the DAG to filter by. Defaults to None.
329+
330+
Returns:
331+
SQLAlchemy query object: The constructed query.
332+
"""
326333
query=session.query(airflow_db_model)
327334

328335
logging.info("INITIAL QUERY : "+str(query))
329336

330-
ifnotkeep_last:
331-
query=query.filter(
332-
age_check_column<=max_date,
337+
ifdag_id:
338+
query=query.filter(airflow_db_model.dag_id==dag_id)
339+
340+
ifairflow_db_model==DagRun:
341+
# For DaRus we want to leave last DagRun regardless of its age
342+
newest_dagrun= (
343+
session
344+
.query(airflow_db_model)
345+
.filter(airflow_db_model.dag_id==dag_id)
346+
.order_by(desc(airflow_db_model.execution_date))
347+
.first()
333348
)
349+
logging.info("Newest dagrun: "+str(newest_dagrun))
350+
ifnewest_dagrunisnotNone:
351+
query= (
352+
query
353+
.filter(DagRun.external_trigger.is_(False))
354+
.filter(age_check_column<=max_date)
355+
.filter(airflow_db_model.id!=newest_dagrun.id)
356+
)
357+
else:
358+
query=query.filter(sql.false())
334359
else:
335-
subquery=session.query(func.max(DagRun.execution_date))
336-
# workaround for MySQL "table specified twice" issue
337-
# https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
338-
ifkeep_last_filtersisnotNone:
339-
forentryinkeep_last_filters:
340-
subquery=subquery.filter(entry)
341-
342-
logging.info("SUB QUERY [keep_last_filters]: "+str(subquery))
343-
344-
ifkeep_last_group_byisnotNone:
345-
subquery=subquery.group_by(keep_last_group_by)
346-
logging.info("SUB QUERY [keep_last_group_by]: "+str(subquery))
360+
query=query.filter(age_check_column<=max_date)
347361

348-
subquery=subquery.from_self()
349-
350-
query=query.filter(
351-
and_(age_check_column.notin_(subquery)),and_(age_check_column<=max_date)
352-
)
362+
logging.info("FINAL QUERY: "+str(query))
353363

354364
returnquery
355365

@@ -410,13 +420,10 @@ def cleanup_function(**context):
410420
try:
411421
ifcontext["params"].get("do_not_delete_by_dag_id"):
412422
query=build_query(
413-
session,
414-
airflow_db_model,
415-
age_check_column,
416-
max_date,
417-
keep_last,
418-
keep_last_filters,
419-
keep_last_group_by,
423+
session=session,
424+
airflow_db_model=airflow_db_model,
425+
age_check_column=age_check_column,
426+
max_date=max_date,
420427
)
421428
ifPRINT_DELETES:
422429
print_query(query,airflow_db_model,age_check_column)
@@ -429,17 +436,14 @@ def cleanup_function(**context):
429436
session.commit()
430437

431438
list_dags= [str(list(dag)[0])fordagindags]+ [None]
432-
fordaginlist_dags:
439+
fordag_idinlist_dags:
433440
query=build_query(
434-
session,
435-
airflow_db_model,
436-
age_check_column,
437-
max_date,
438-
keep_last,
439-
keep_last_filters,
440-
keep_last_group_by,
441+
session=session,
442+
airflow_db_model=airflow_db_model,
443+
age_check_column=age_check_column,
444+
max_date=max_date,
445+
dag_id=dag_id,
441446
)
442-
query=query.filter(airflow_db_model.dag_id==dag)
443447
ifPRINT_DELETES:
444448
print_query(query,airflow_db_model,age_check_column)
445449
ifENABLE_DELETE:
@@ -448,7 +452,7 @@ def cleanup_function(**context):
448452
session.commit()
449453

450454
ifnotENABLE_DELETE:
451-
logging.warn(
455+
logging.warning(
452456
"You've opted to skip deleting the db entries. "
453457
"Set ENABLE_DELETE to True to delete entries!!!"
454458
)
@@ -458,7 +462,9 @@ def cleanup_function(**context):
458462
exceptProgrammingErrorase:
459463
logging.error(e)
460464
logging.error(
461-
str(airflow_db_model)+" is not present in the metadata."+"Skipping..."
465+
str(airflow_db_model)+
466+
" is not present in the metadata."+
467+
"Skipping..."
462468
)
463469

464470
finally:
@@ -471,10 +477,15 @@ def cleanup_sessions():
471477
try:
472478
logging.info("Deleting sessions...")
473479
count_statement= (
474-
"SELECT COUNT(*) AS cnt FROM session WHERE expiry < now()::timestamp(0);"
480+
"SELECT COUNT(*) AS cnt FROM session "+
481+
"WHERE expiry < now()::timestamp(0);"
475482
)
476483
before=session.execute(text(count_statement)).one_or_none()["cnt"]
477-
session.execute(text("DELETE FROM session WHERE expiry < now()::timestamp(0);"))
484+
session.execute(
485+
text(
486+
"DELETE FROM session WHERE expiry < now()::timestamp(0);"
487+
)
488+
)
478489
after=session.execute(text(count_statement)).one_or_none()["cnt"]
479490
logging.info("Deleted %s expired sessions.", (before-after))
480491
exceptExceptionaserr:
@@ -492,7 +503,10 @@ def analyze_db():
492503

493504

494505
analyze_op=PythonOperator(
495-
task_id="analyze_query",python_callable=analyze_db,provide_context=True,dag=dag
506+
task_id="analyze_query",
507+
python_callable=analyze_db,
508+
provide_context=True,
509+
dag=dag
496510
)
497511

498512
cleanup_session_op=PythonOperator(

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp