Query the “airflow_db” database’s – “dag_run” table to get all the running DAGs that are in Using a big query client to connect to GCP and fetch the data of the average dag_run time and store it in a python dictionary. The above DAG will take advantage of the above average dag_run table from the GCP big query and will compare the DAG run with the current run time with all the individual DAGs. job_for_long_running_checks – (Every Hour DAG Run)Ī.This will maintain the historical run time of each DAG based on recent history based on a variety of factors such as code updates, the environment in which the DAG is running, etc. The choice to revisit this process monthly keeps the Average Run Time information updated in a regular manner. d.ğor the job to run, I’ve scheduled that job to run monthly, to get fresh average DAG run data for every month.Then, I stored it in a list of lists to store average DAG run data. SELECT a.dag_id, avg(diff_time) FROM ( SELECT dag_id, start_date,end_date, TIMESTAMPDIFF( SECOND,start_date, end_date) as diff_time FROM dag_run WHERE state = "success" and start_date > NOW() - INTERVAL 30 DAY ORDER BY start_date desc ) a GROUP BY dag_id Then, I ran the following MySQL query to get the average time of every running DAG I’ve used the “airflow_db” database’s – “dag_run” table to get all the individual successful DAG run jobs in the past 30 days. load_data_from_airflow_to_bq – (Monthly dag Run).In this case, I have set it to run on an hourly basis.įollowing are the two DAGs created for overcoming the above problem: This entire process is now encapsulated within a new DAG that monitors the environment in which it is deployed and executed in a regular manner. There’s a caveat here: the average run time for each DAG was previously calculated and stored in another manually created table. If the current run time is more than the average run time by a considerable margin, then we have a DAG of interest which needs to be investigated. The basic idea is to execute a process in the background from these tables and figure out the DAGs in the execution state.įrom such DAGs in execution, I compared the current run time to the average run time. Therefore, I took the advantage of the above-mentioned “airflow_db” database that can help figure out DAGs that are running longer than their usual run time. The above output shows all the column information for DAG - airflow_monitoring The snapshots give a view of all the DAGs that are currently running Out of the tables, the table which is relevant is – In the following example, the database is – airflow_db. In the later versions of Airflow, this Data Profiler is disabled. You can also query these tables using SQL statements available in the Data Profiler option in Airflow version 1.10.15+ under Ad-Hoc Query. You can solve this issue by proactive monitoring.Īirflow jobs are tracked by underlying relational tables that maintain the log for all the DAGs, the tasks within each DAG, running statistics, running status, etc. This can even lead to a business SLA breach. The reason is that there is always a chance of missing that one nasty job out of the hundreds of jobs running in a live environment. This is particularly important for the production team supporting day-to-day operations. One of the common challenges with any job scheduler is to identify those workflows that run longer than their average run time. The underlying code is written in Python which relies on calling libraries for scheduling tasks periodically. With Airflow, one can create Directed Acyclic Graphs, commonly called DAGs, which are used for representing these workflows graphically. Apache Airflow is relatively a new, yet immensely popular tool, for scheduling workflows and automating tasks in a production environment.
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |