@JunQiu
2018-09-18T10:13:20.000000Z
字数 1431
阅读 7087
tools summary_2018/09
研究官方的Xcom方案:// 从一个 operator 中放入 Xcom 中变量可以使用 xcom_push() method、或者函数返回值、Operator’s execute() method# Tasks can push XComs at any time by calling the xcom_push() method.# In addition, if a task returns a value (either from its Operator’s execute() method,# or from a PythonOperator’s python_callable function), then an XCom containing that value is automatically pushed.// 取回Xcom中的变量可以根据 key, source task_ids, and source dag_id.# Tasks call xcom_pull() to retrieve XComs, optionally applying filters based# on criteria like key, source task_ids, and source dag_id.# By default, xcom_pull() filters for the keys that are automatically given to XComs# when they are pushed by being returned from execute functions (as opposed to XComs that are pushed manually).在一个DAG中:key value方式,value支持py的数据类型,应该是存储在数据库中:

from airflow.models import DAGfrom airflow.operators.python_operator import PythonOperatorfrom datetime import datetimeDAG = DAG(dag_id='xcom_test',start_date=datetime.now(),schedule_interval='@once')ls = ['a', 'b', 'c']def push_function(**kwargs):return lspush_task = PythonOperator(task_id='push_task',python_callable=push_function,dag=DAG)def pull_function(**kwargs):# 感觉函数返回值上使用xcom可能是多余的,必须使用['any']keyvalue = kwargs['ti'].xcom_pull(task_ids='push_task')return valuepull_task = PythonOperator(task_id='pull_task',python_callable=pull_function,provide_context=True,dag=DAG)push_task >> pull_task
