@JunQiu
2018-09-18T18:13:06.000000Z
字数 5176
阅读 2102
summary_2018/09
tools
dag_1 = DAG('this_dag_will_be_discovered')
// dag_2 can not be uue
def my_function():
dag_2 = DAG('but_this_dag_will_not')
my_function()
While DAGs describe how to run a workflow, Operators determine what actually gets done. (two operators 需要共享信息)
确定 operator 所属 DAG ??
// Operators do not have to be assigned to DAGs immediately (previously dag was a required argument).
dag = DAG('my_dag', start_date=datetime(2016, 1, 1))
# sets the DAG explicitly
explicit_op = DummyOperator(task_id='op1', dag=dag)
# deferred DAG assignment
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag
# inferred DAG assignment (linked operators must be in the same DAG)
inferred_op = DummyOperator(task_id='op3')
inferred_op.set_upstream(deferred_op)
// operator relationships are set with the set_upstream() and set_downstream() methods. In Airflow 1.8, this can be done with the Python bitshift operators >> and <<.
// example op1 >> op2 >> op3 << op4
// 感觉这种写法很优雅
with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
(
DummyOperator(task_id='dummy_1')
>> BashOperator(
task_id='bash_1',
bash_command='echo "HELLO!"')
>> PythonOperator(
task_id='python_1',
python_callable=lambda: print("GOODBYE!"))
)
// 在operator中指定pool,默认不会放进任务pool中
pool='ep_data_pipeline_db_msg_agg',
from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)
Branching
SubDAGs
#dags/subdag.py
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
# Dag is returned by a factory method
def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date,
)
dummy_operator = DummyOperator(
task_id='dummy_task',
dag=dag,
)
return dag
Trigger Rules
1、all_success: (default) all parents have succeeded
2、all_failed: all parents are in a failed or upstream_failed state
3、all_done: all parents are done with their execution
4、one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
5、one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
6、dummy: dependencies are just for show, trigger at will
how to avoid backfills or running jobs missed ??
// 方案一 使用LatestOnlyOperator跳过,比较简单推荐
import datetime as dt
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
dag = DAG(
dag_id='latest_only_with_trigger',
schedule_interval=dt.timedelta(hours=4),
start_date=dt.datetime(2016, 9, 20),
)
// skip all backfills
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
// skip all backfills(not all_success)
task1 = DummyOperator(task_id='task1', dag=dag)
task1.set_upstream(latest_only)
Tips:可以结合Trigger Rules使用
// 方案二(没有试用过)
使用Short Circuit Operator检查当前 DAG Run 是否为最新,不是最新的直接跳过整个 DAG
关于airflow 关于权重的处理,默认downstream :自身和下游(依赖它的)任务权重的和来与其它任务作比较(即任务依赖越多,可能权重就越大)
def priority_weight_total(self):
if self.weight_rule == WeightRule.ABSOLUTE:
return self.priority_weight
elif self.weight_rule == WeightRule.DOWNSTREAM:
upstream = False
elif self.weight_rule == WeightRule.UPSTREAM:
upstream = True
else:
upstream = False
return self.priority_weight + sum(
map(lambda task_id: self._dag.task_dict[task_id].priority_weight,
self.get_flat_relative_ids(upstream=upstream))
)
一些比较重要的配置参数
DAG/Operator一些比较重要的参数