@JunQiu
2018-09-18T18:15:37.000000Z
字数 3756
阅读 2243
summary_2018/09
tools
// WebServer
Airflow 提供了一个可视化的 Web 界面。启动 WebServer 后,就可以在 Web 界面上查看定义好的 DAG 并监控及改变运行状况。也可以在 Web 界面中对一些变量进行配置。
// Worker
一般来说我们用 Celery Worker 来执行具体的作业。Worker 可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker 就会接收这个作业任务,并开始执行。Airflow 会自动在每个部署 Worker 的机器上同时部署一个 Serve Logs 服务,这样我们就可以在 Web 界面上方便的浏览分散在不同机器上的作业日志了。
// Scheduler
整个 Airflow 的调度由 Scheduler 负责发起,每隔一段时间 Scheduler 就会检查所有定义完成的 DAG 和定义在其中的作业,如果有符合运行条件的作业,Scheduler 就会发起相应的作业任务以供 Worker 接收。
// Flower
Flower 提供了一个可视化界面以监控所有 Celery Worker 的运行状况。这个服务并不是必要的。
1、airflow启动时,会将DAG中的相关信息写入数据库。
2、scheduler会按照指定频次查询数据库,检测是否有需要触发的任务。
3、当scheduler检测到需要触发的任务时,会向消息队列发送一条Message。
4、Celery会定时查询消息队列中是否有Message。当检测到Message时,会将Message中包含的任务信息下发给Worker,由Worker执行具体任务
// Celery:分布式消息队列
# Importing Modules
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
# some arguments for task
# start_date DAG == schedule_interval is best way
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
'schedule_interval': timedelta(days=1),
'end_date': datetime(2018, 9, 4),
}
dag = DAG('qj_airflow_test', default_args=default_args)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t4 = BashOperator(
task_id='print_date1',
bash_command='date',
dag=dag)
# Setting up Dependencies
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream([t2, t3])
// 一些字段介绍
## DAG
要配置一个 DAG 自然需要一个 DAG 实例。在同一个 DAG 下的所有作业,都需要将它的 dag 属性设置为这个 DAG 实例。在实例化 DAG 时,通过传参数可以给这个 DAG 实例做一些必要的配置。
# dag_id
给 DAG 取一个名字,方便日后维护。
# default_args
默认参数,当属于这个DAG实例的作业没有配置相应参数时,将使用 DAG 实例的 default_args 中的相应参数。
# schedule_interval
配置 DAG 的执行周期,语法和 crontab 的一致。
## Task
Airflow 提供了很多 Operator, BashOperator 则会执行 bash_command 参数所指定的 bash 指令,PythonOperator执行 Python callables,或者GoogleCloudStorageToBigQueryOperator、kubernetes_pod_operator(可以使用docker image,Tips:startup_timeout_seconds (int) – timeout in seconds to startup the pod(默认120))。
Tips:如果在 DAG 中有设置 default_args 而在 Operator 中没有覆盖相应配置,则会使用 default_args 中的配置。
# dag
传递一个 DAG 实例,以使当前作业属于相应 DAG。
# task_id
给作业去一个名字,方便日后维护。
# owner
作业的拥有者,方便作业维护。另外有些 Operator 会根据该参数实现相应的权限控制。
# start_date
作业的开始时间,即作业将在这个时间点以后开始调度。
## 依赖的配置
除了可以使用作业实例的 set_upstream 和 set_downstream 方法外,还可以使用类似:
task1 << task2 << task3
task3 >> task4
## 区分几个时间
start date: 在配置中,它是作业开始调度时间。而在谈论执行状况时,它是调度开始时间。
schedule interval: 调度执行周期。
execution date: 执行时间,在 Airflow 中称之为执行时间,但其实它并不是真实的执行时间。
# 举个例子:
假设我们配置了一个作业的 start date 为 2017年10月1日,配置的 schedule interval 为 **00 12 * * *** 那么第一次执行的时间将是 2017年10月2日 12点 而此时记录的 execution date 为 2017年10月1日 12点。因此 execution date 并不是如其字面说的表示执行时间,真正的执行时间是 execution date 所显示的时间的下一个满足 schedule interval 的时间点。