关于airflow的定时机制
tools
1、关于airflow的定时机制
1.1、概述
- 需要使用airflow定时工具取代以前的crontab定时任务,优雅的UI、日志、任务执行情况分析等等来更好的管理和维护定时任务,但是在实际开发中对定时机制的不了解,出现了许多的问题,比如定时任务并未按时执行,任务回调;因此,本篇文章将深入了解定时机制,来解决这一系列的问题。
1.2、airflow定时机制原理
1、dag目录刷新间隔:dag_dir_list_interval,默认100s
2、min_file_process_interval,默认0s
This parameter ensures that a DAG definition file is not processed more often than once every `min_file_process_interval` seconds.
- 流程逻辑
- 1、Enumerate the all the files in the DAG directory.
- 2、Start a configurable number of processes and for each one, assign a DAG file to process.
- 3、In each child process, parse the DAG file, create the necessary DagRuns given the state of the DAG's task instances, and for all the task instances that should run, create a TaskInstance (with the
SCHEDULED
state) in the ORM.
- 4、Back in the main scheduler process, query the ORM for task instances in the
SCHEDULED
state. If any are found, send them to the executor and set the task instance state to QUEUED
.
- 5、If any of the child processes have finished, create another process to work on the next file in the series, provided that the number of running processes is less than the configured limit.
- 6、Once a process has been launched for all of the files in the DAG directory, the cycle is repeated. If the process to parse a particular DAG file is still running when the file's turn comes up in the next cycle, a new process is not launched and a process for the next file in the series is launched instead. This way, a DAG file that takes a long time to parse does not necessarily block the processing of other DAGs.
1.2.2、定时任务
- 从下面的图中,我们可以清楚的了解到,任务的调度由Scheduler完成。
- 调度器的工作流程:
步骤 0. 从磁盘中加载可用的 DAG 定义(填充 DagBag)
当调度器运行时:
步骤 1. 调度器使用 DAG 定义来标识并且/或者初始化在元数据的 db 中的任何 DagRuns。
步骤 2. 调度器检查与活动 DagRun 关联的 TaskInstance 的状态,解析 TaskInstance 之间的任何依赖,标识需要被执行的 TaskInstance,然后将它们添加至 worker 队列,将新排列的 TaskInstance 状态更新为数据库中的“排队”状态。
步骤 3. 每个可用的 worker 从队列中取一个 TaskInstance,然后开始执行它,将此 TaskInstance 的数据库记录从“排队”更新为“运行”。
步骤 4. 一旦一个 TaskInstance 完成运行,关联的 worker 就会报告到队列并更新数据库中的 TaskInstance 的状态(例如“完成”、“失败”等)。
步骤 5. 调度器根据所有已完成的相关 TaskInstance 的状态更新所有活动 DagRuns 的状态(“运行”、“失败”、“完成”)。
步骤 6. 重复步骤 1-5
1、当我把start_date设置为datetime.now()时,定时任务并未执行??
官方解释:(不是很明白)
We recommend against using dynamic values as start_date, especially datetime.now() as it can be quite confusing. The task is triggered once the period closes, and in theory an @hourly DAG would never get to an hour after now as now() moves along.
2、当把时间设置为动态时间时,即发生回填之后定时任务也未执行??
回答同上文
- 虽然解决了问题,但是还是有很多不明白的地方,没有搞懂本质,可能有时间还需要回来看一看
- 定时任务的两个主要参数:
- start_date(datetime):参考FAQ
- schedule_interval is defined as a DAG arguments, and receives preferably a cron expression as a str, or a datetime.timedelta object. Alternatively, you can also use one of these cron “preset”。比较推荐cron expression的方式。(默认 1 day)
1.3、参考文档