@JunQiu
2018-09-18T10:13:06.000000Z
字数 5176
阅读 2454
summary_2018/09 tools
dag_1 = DAG('this_dag_will_be_discovered')// dag_2 can not be uuedef my_function():dag_2 = DAG('but_this_dag_will_not')my_function()
While DAGs describe how to run a workflow, Operators determine what actually gets done. (two operators 需要共享信息)
确定 operator 所属 DAG ??
// Operators do not have to be assigned to DAGs immediately (previously dag was a required argument).dag = DAG('my_dag', start_date=datetime(2016, 1, 1))# sets the DAG explicitlyexplicit_op = DummyOperator(task_id='op1', dag=dag)# deferred DAG assignmentdeferred_op = DummyOperator(task_id='op2')deferred_op.dag = dag# inferred DAG assignment (linked operators must be in the same DAG)inferred_op = DummyOperator(task_id='op3')inferred_op.set_upstream(deferred_op)
// operator relationships are set with the set_upstream() and set_downstream() methods. In Airflow 1.8, this can be done with the Python bitshift operators >> and <<.// example op1 >> op2 >> op3 << op4// 感觉这种写法很优雅with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:(DummyOperator(task_id='dummy_1')>> BashOperator(task_id='bash_1',bash_command='echo "HELLO!"')>> PythonOperator(task_id='python_1',python_callable=lambda: print("GOODBYE!")))
// 在operator中指定pool,默认不会放进任务pool中pool='ep_data_pipeline_db_msg_agg',
from airflow.models import Variablefoo = Variable.get("foo")bar = Variable.get("bar", deserialize_json=True)
Branching

SubDAGs

#dags/subdag.pyfrom airflow.models import DAGfrom airflow.operators.dummy_operator import DummyOperator# Dag is returned by a factory methoddef sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):dag = DAG('%s.%s' % (parent_dag_name, child_dag_name),schedule_interval=schedule_interval,start_date=start_date,)dummy_operator = DummyOperator(task_id='dummy_task',dag=dag,)return dag
Trigger Rules
1、all_success: (default) all parents have succeeded2、all_failed: all parents are in a failed or upstream_failed state3、all_done: all parents are done with their execution4、one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done5、one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done6、dummy: dependencies are just for show, trigger at will
how to avoid backfills or running jobs missed ??
// 方案一 使用LatestOnlyOperator跳过,比较简单推荐import datetime as dtfrom airflow.models import DAGfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.operators.latest_only_operator import LatestOnlyOperatordag = DAG(dag_id='latest_only_with_trigger',schedule_interval=dt.timedelta(hours=4),start_date=dt.datetime(2016, 9, 20),)// skip all backfillslatest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)// skip all backfills(not all_success)task1 = DummyOperator(task_id='task1', dag=dag)task1.set_upstream(latest_only)Tips:可以结合Trigger Rules使用// 方案二(没有试用过)使用Short Circuit Operator检查当前 DAG Run 是否为最新,不是最新的直接跳过整个 DAG
关于airflow 关于权重的处理,默认downstream :自身和下游(依赖它的)任务权重的和来与其它任务作比较(即任务依赖越多,可能权重就越大)
def priority_weight_total(self):if self.weight_rule == WeightRule.ABSOLUTE:return self.priority_weightelif self.weight_rule == WeightRule.DOWNSTREAM:upstream = Falseelif self.weight_rule == WeightRule.UPSTREAM:upstream = Trueelse:upstream = Falsereturn self.priority_weight + sum(map(lambda task_id: self._dag.task_dict[task_id].priority_weight,self.get_flat_relative_ids(upstream=upstream)))
一些比较重要的配置参数
DAG/Operator一些比较重要的参数
