[关闭]
@JunQiu 2018-09-18T18:13:06.000000Z 字数 5176 阅读 2102

airflow(tips)

summary_2018/09 tools


1、日常

1.1、airflow深入学习及一些TIPs


2、技术

1.1、airflow深入学习及一些TIPS

  1. dag_1 = DAG('this_dag_will_be_discovered')
  2. // dag_2 can not be uue
  3. def my_function():
  4. dag_2 = DAG('but_this_dag_will_not')
  5. my_function()
  1. // Operators do not have to be assigned to DAGs immediately (previously dag was a required argument).
  2. dag = DAG('my_dag', start_date=datetime(2016, 1, 1))
  3. # sets the DAG explicitly
  4. explicit_op = DummyOperator(task_id='op1', dag=dag)
  5. # deferred DAG assignment
  6. deferred_op = DummyOperator(task_id='op2')
  7. deferred_op.dag = dag
  8. # inferred DAG assignment (linked operators must be in the same DAG)
  9. inferred_op = DummyOperator(task_id='op3')
  10. inferred_op.set_upstream(deferred_op)
  1. // operator relationships are set with the set_upstream() and set_downstream() methods. In Airflow 1.8, this can be done with the Python bitshift operators >> and <<.
  2. // example op1 >> op2 >> op3 << op4
  3. // 感觉这种写法很优雅
  4. with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
  5. (
  6. DummyOperator(task_id='dummy_1')
  7. >> BashOperator(
  8. task_id='bash_1',
  9. bash_command='echo "HELLO!"')
  10. >> PythonOperator(
  11. task_id='python_1',
  12. python_callable=lambda: print("GOODBYE!"))
  13. )
  1. // 在operator中指定pool,默认不会放进任务pool中
  2. pool='ep_data_pipeline_db_msg_agg',
  1. from airflow.models import Variable
  2. foo = Variable.get("foo")
  3. bar = Variable.get("bar", deserialize_json=True)
  1. #dags/subdag.py
  2. from airflow.models import DAG
  3. from airflow.operators.dummy_operator import DummyOperator
  4. # Dag is returned by a factory method
  5. def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
  6. dag = DAG(
  7. '%s.%s' % (parent_dag_name, child_dag_name),
  8. schedule_interval=schedule_interval,
  9. start_date=start_date,
  10. )
  11. dummy_operator = DummyOperator(
  12. task_id='dummy_task',
  13. dag=dag,
  14. )
  15. return dag
  1. // 方案一 使用LatestOnlyOperator跳过,比较简单推荐
  2. import datetime as dt
  3. from airflow.models import DAG
  4. from airflow.operators.dummy_operator import DummyOperator
  5. from airflow.operators.latest_only_operator import LatestOnlyOperator
  6. dag = DAG(
  7. dag_id='latest_only_with_trigger',
  8. schedule_interval=dt.timedelta(hours=4),
  9. start_date=dt.datetime(2016, 9, 20),
  10. )
  11. // skip all backfills
  12. latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
  13. // skip all backfills(not all_success)
  14. task1 = DummyOperator(task_id='task1', dag=dag)
  15. task1.set_upstream(latest_only)
  16. Tips:可以结合Trigger Rules使用
  17. // 方案二(没有试用过)
  18. 使用Short Circuit Operator检查当前 DAG Run 是否为最新,不是最新的直接跳过整个 DAG
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注