@JunQiu
2018-09-18T18:13:20.000000Z
字数 1431
阅读 6647
tools
summary_2018/09
研究官方的Xcom方案:
// 从一个 operator 中放入 Xcom 中变量可以使用 xcom_push() method、或者函数返回值、Operator’s execute() method
# Tasks can push XComs at any time by calling the xcom_push() method.
# In addition, if a task returns a value (either from its Operator’s execute() method,
# or from a PythonOperator’s python_callable function), then an XCom containing that value is automatically pushed.
// 取回Xcom中的变量可以根据 key, source task_ids, and source dag_id.
# Tasks call xcom_pull() to retrieve XComs, optionally applying filters based
# on criteria like key, source task_ids, and source dag_id.
# By default, xcom_pull() filters for the keys that are automatically given to XComs
# when they are pushed by being returned from execute functions (as opposed to XComs that are pushed manually).
在一个DAG中:key value方式,value支持py的数据类型,应该是存储在数据库中:
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
DAG = DAG(
dag_id='xcom_test',
start_date=datetime.now(),
schedule_interval='@once'
)
ls = ['a', 'b', 'c']
def push_function(**kwargs):
return ls
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
dag=DAG)
def pull_function(**kwargs):
# 感觉函数返回值上使用xcom可能是多余的,必须使用['any']key
value = kwargs['ti'].xcom_pull(task_ids='push_task')
return value
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=DAG)
push_task >> pull_task