[关闭]
@wddpct 2020-11-27T14:16:00.000000Z 字数 12248 阅读 1040

数据治理引擎任务编写与功能介绍


前言

本文档主要介绍任务编写步骤与数据治理逻辑功能集,以下内容将从一个实际可运行任务的配置文件出发,介绍一个任务的具体组成(公用模板,功能节点等),并在介绍中穿插功能集描述。

附件部分介绍表达式语法,不在任务编写与功能集部分展开。

任务编写与功能集 expr_builder

expr_builder是一个由javascript编写的程序包,可在自定义的nodejs工程下运行npm install expr_builder安装导入。expr_builder包提供了SummaryType,Context等关键数据结构用于编写测试任务。

示例任务配置文件 simple_test.js

任务配置文件最终输出为一个javascript文件(以.js为后缀)。

  1. const {SummaryType, Context} = require("expr_builder");
  2. function dict(dataSource, table, lookupColumn, targetColumn, cacheSize = 10000) {
  3. return function (node) {
  4. return node.mapQuery({
  5. "cacheSize": cacheSize,
  6. "dataSource": dataSource,
  7. "targetTable": table,
  8. "targetColumns": [targetColumn],
  9. "query": [
  10. {
  11. "column": lookupColumn,
  12. "operator": "=",
  13. "value": "$1"
  14. }
  15. ]
  16. })
  17. }
  18. }
  19. const ctx = new Context();
  20. ctx
  21. .dataSource("his")
  22. .sourceTable("medrec", "pat_master_index")
  23. .fetchCount(500)
  24. .primaryKeys('patient_id')
  25. .outPrimaryKeys("source_patient_no")
  26. .parallel(10)
  27. .dbSink("demo", "etl_test", "patient_base_info_simple_test", false, true)
  28. .name("patient_base_info_simple_test");
  29. const patient_id = ctx.column("medrec.pat_master_index.patient_id");
  30. patient_id
  31. .output("source_patient_no");
  32. const name = ctx.column("medrec.pat_master_index.name");
  33. name
  34. .filter("$1")
  35. .output("patient_name");
  36. name
  37. .map("$1|toPYCode|toUpper")
  38. .output("spell_code");
  39. name
  40. .map("$1|toWBCode|toUpper")
  41. .output("wb_code");
  42. const sex = ctx.column("medrec.pat_master_index.sex");
  43. sex
  44. .map(`multiIf($1|contains("男"),"男性",$1|contains("女"),"女性","未知的性别")`)
  45. .output("sex_name");
  46. sex
  47. .map(`multiIf($1|contains("男"),1,$1|contains("女"),2,0):Text`)
  48. .output("sex_code");
  49. const nation_name = ctx.column("medrec.pat_master_index.nation")
  50. .map('multiIf($1|contains("汉"),"汉族",$1)')
  51. .map('regexpMatch($1,"[\u4e00-\u9fa5]+族")')
  52. .output("nation_name");
  53. const birth_date = ctx.column("medrec.pat_master_index.date_of_birth");
  54. birth_date
  55. .map('$1:Date')
  56. .output("birth_date");
  57. const profession = ctx.column("medrec.pat_master_index.occupation");
  58. profession.output("profession_code")
  59. profession
  60. .pipe(dict("his", "comm.occupation_dict", "occupation_code", "serial_no"))
  61. .output("profession_id");
  62. profession
  63. .pipe(dict("his", "comm.occupation_dict", "occupation_code", "occupation_name"))
  64. .output("profession");
  65. ctx.const('true').output("is_valid");
  66. ctx.const('now()').output("oper_time");
  67. const insurance_no = ctx.column("medrec.pat_master_index.insurance_no");
  68. const vip_id = ctx.column("medrec.pat_master_index.vip_id");
  69. const cardNoAlt = ctx.concat(insurance_no, vip_id);
  70. cardNoAlt
  71. .map(`multiIf($1,"YBKH", $2,"JZKH")`)
  72. .output("card_type");
  73. cardNoAlt
  74. .map("coalesce($1,$2)")
  75. .output("card_no");
  76. module.exports = ctx;

任务说明

以下以行号作为索引进行编写与功能介绍

1~1 导入关键结构

  1. const {SummaryType, Context} = require("expr_builder");

固定模板代码。用于导入expr_builder这个npm包的SummaryType以及Context结构,提供给下文代码进行统计节点(SummaryType)与编写上下文(Context)的注册。

2~18 注册字典对象

  1. function dict(dataSource, table, lookupColumn, targetColumn, cacheSize = 10000) {
  2. return function (node) {
  3. return node.mapQuery({
  4. "cacheSize": cacheSize,
  5. "dataSource": dataSource,
  6. "targetTable": table,
  7. "targetColumns": [targetColumn],
  8. "query": [
  9. {
  10. "column": lookupColumn,
  11. "operator": "=",
  12. "value": "$1"
  13. }
  14. ]
  15. })
  16. }
  17. }

固定模板代码。自定义字典表查询函数,提供给下文的pipe功能节点用于外部数据源字典表的数据匹配。

19~19 注册编写上下文

  1. const ctx = new Context();

固定模板代码。用于注册编写上下文对象ctx,下文中所有的原始列都是通过ctx对象提供的column方法取得。

20~28 指定编写上下文属性

  1. ctx
  2. .dataSource("his")
  3. .sourceTable("medrec", "pat_master_index")
  4. .fetchCount(500)
  5. .primaryKeys('patient_id')
  6. .outPrimaryKeys("source_patient_no")
  7. .parallel(10)
  8. .dbSink("demo", "etl_test", "patient_base_info_simple_test", false, true)
  9. .name("patient_base_info_simple_test");
  1. js 代码对于换行写法比较宽容,但注意最好用 ; 符号作为完整语句的结束符号。

  2. dataSource(),sourceTable(),fetchCount()等一系列方法都是ctx对象自带的方法(但并不是所有在配置文件中出现的方法都是ctx的方法,比如下文中的output方法),但不需要分开写作以下代码,因为每个ctx的大部分自带方法都返回了ctx本身,所以这些代码引用方法的顺序可任意互换。
    ctx.dataSource("his");
    ctx.sourceTable("medrec", "pat_master_index");
    ctx.primaryKeys('patient_id');
    ctx.outPrimaryKeys("source_patient_no");
    ctx.parallel(10);
    ctx.dbSink("demo", "etl_test", "patient_base_info_simple_test", false, true);
    ctx.name("patient_base_info_simple_test");

30~30 column 方法介绍

  1. const patient_id = ctx.column("medrec.pat_master_index.patient_id");

column方法为ctx对象的自带方法,表示生成一个名为patient_id数据节点(Node) ,用于进一步的数据治理操作和输出(因为patient_id在下文中仍然有使用,所以定义成一个变量)。输入参数的格式为 schema_name.table_name.column_name。

值得注意的是,在ctx的所有定义方法集中,column方法不返回节点本身,所以column方法仅能作为数据节点最后一个使用的方法,而不能与其他ctx的方法更改位置,同时一个语句中只能出现一次。

31~32 output 方法介绍

  1. patient_id
  2. .output("source_patient_no");

output方法为数据节点(Node)自带的方法,输入参数为单一的列名(必须存在于dbsink指定的数据源表中)。

上述三行代码可以直接合并写成以下代码
ctx.column("medrec.pat_master_index.patient_id").output("source_patient_no");

值得注意的是,节点的所有定义方法集唯独output方法不返回节点本身,所以output方法仅能作为数据节点最后一个使用的方法,而不能与其他方法更改位置,同时一个语句中只能出现一次。

34~43 filter, map 方法介绍

  1. const name = ctx.column("medrec.pat_master_index.name");
  2. name
  3. .filter("$1")
  4. .output("patient_name");
  5. name
  6. .map("$1|toPYCode|toUpper")
  7. .output("spell_code");
  8. name
  9. .map("$1|toWBCode|toUpper")
  10. .output("wb_code");

由于 medrec.pat_master_index.name 代表的数据节点后文仍需使用,所以定义成了一个名为name的数据节点。之后遇到此类情况不额外说明。

filter为数据节点自带的方法,之所以能和另一个自带方法output写成链式语法的原因同ctx。之后遇到此类情况不额外说明。

值得注意的是,一个数据节点可以有多个治理逻辑并输出到多个节点中,比如上文中的name节点就分别应用了不同的数据治理逻辑输出到了patient_name, spell_code, wb_code这些输出节点中。

我们也可以书写改写以上语句为下面的代码,这类代码说明我们可以在调用任意方法后(如filter)赋值成一个数据节点,再进行进一步的数据治理工作。

  1. const name = ctx.column("medrec.pat_master_index.name");
  2. const filteredName = name.filter("$1");
  3. filteredName
  4. .output("patient_name");
  5. filteredName
  6. .map("$1|toPYCode|toUpper")
  7. .output("spell_code");
  8. filteredName
  9. .map("$1|toWBCode|toUpper")
  10. .output("wb_code");

45~61 略

  1. const sex = ctx.column("medrec.pat_master_index.sex");
  2. sex
  3. .map(`multiIf($1|contains("男"),"男性",$1|contains("女"),"女性","未知的性别")`)
  4. .output("sex_name");
  5. sex
  6. .map(`multiIf($1|contains("男"),1,$1|contains("女"),2,0):Text`)
  7. .output("sex_code");
  8. const nation_name = ctx.column("medrec.pat_master_index.nation")
  9. .map('multiIf($1|contains("汉"),"汉族",$1)')
  10. .map('regexpMatch($1,"[\u4e00-\u9fa5]+族")')
  11. .output("nation_name");
  12. const birth_date = ctx.column("medrec.pat_master_index.date_of_birth");
  13. birth_date
  14. .map('$1:Date')
  15. .output("birth_date");

略。

63~70 pipe,dict 方法介绍

  1. const profession = ctx.column("medrec.pat_master_index.occupation");
  2. profession.output("profession_code")
  3. profession
  4. .pipe(dict("his", "comm.occupation_dict", "occupation_code", "serial_no"))
  5. .output("profession_id");
  6. profession
  7. .pipe(dict("his", "comm.occupation_dict", "occupation_code", "occupation_name"))
  8. .output("profession");

pipe 方法暂不过多介绍,dict的使用方法在上文中已经介绍,这两个方法需要结合使用。

ctx.column("medrec.pat_master_index.occupation").pipe(dict("his", "comm.occupation_dict", "occupation_code", "serial_no")) 表示以occupation列与his数据源下的comm.occupation_dict.occupation_code做相等匹配,并取得该表中的serial_no字段作为数据节点。

ctx.column("medrec.pat_master_index.occupation").pipe(dict("his", "comm.occupation_dict", "occupation_code", "occupation_name")) 表示以occupation列与his数据源下的comm.occupation_dict.occupation_code做相等匹配,并取得该表中的occupation_name字段作为数据节点。

72~73 const 方法介绍

  1. ctx.const('true').output("is_valid");
  2. ctx.const('now()').output("oper_time");

const 方法为ctx自带方法,区别于column方法从数据源表中取得数据,const方法指定一个常量表达式直接输出,通常用于,is_valid,oper_time和etl_time这类标准填充字段。

75~84 表达式具名变量,concat,coalesce函数介绍

  1. const insurance_no = ctx.column("medrec.pat_master_index.insurance_no");
  2. const vip_id = ctx.column("medrec.pat_master_index.vip_id");
  3. const cardNoAlt = ctx.concat(insurance_no, vip_id);
  4. cardNoAlt
  5. .map(`multiIf($1,"YBKH", $2,"JZKH")`)
  6. .output("card_type");
  7. cardNoAlt
  8. .map("coalesce($1,$2)")
  9. .output("card_no");

END 配置文件终结配置

  1. module.exports = ctx;

固定模板代码。

任务管理工具 expr-taskcli

expr-taskcli是一个由javascript编写的命令行程序包,可在全局运行npm install -g expr-taskcli安装导入。expr-taskcli命令行工具通过提供对应的操作语法和文件读取功能帮助任务编写者在编写任务后可以方便快捷的提交到指定环境中进行保存和测试运行。

使用方式

使用npm install -g expr-taskcli安装后,可以在任意路径的命令行界面使用etl help指令查看etl的使用提示信息。以下是截止11.27号最新的使用提示输出信息。

  1. taskcli =======
  2. VERSION
  3. expr-taskcli/0.6.6 linux-x64 node-v14.15.0
  4. USAGE
  5. $ taskcli [COMMAND]
  6. TOPICS
  7. env
  8. task
  9. COMMANDS
  10. help display help for taskcli

注册环境变量 TASK_API

  1. linux && macos:
  2. export TASK_API=http://graphng-2023-develop.sy
  3. windows:
  4. (图形界面更改系统环境变量)

expr-taskcli命令行工具大部分指令需要注册一个名为TASK_API的环境变量,该环境变量指示了数据治理引擎服务地址,该服务提供了任务管理与运行调度等HTTP API,并由expr-taskcli命令行工具集成。

提交任务

在编写完一个符合语法规范的任务配置文件后,可以使用etl task:submit命令提交保存。

  1. 用法:
  2. etl task:submit filepath
  3. 示例:
  4. etl task:submit ./simple_test.js
  5. etl task:submit /home/panchengtao/task_builder_files/tests/simple_test.js

如上文所示,etl task:submit命令后接filepathfilepath可以是一个相对路径,也可以是一个配置文件绝对路径。提交成功后,将在命令行界面返回新生成的taskId与taskVersion。如果未按语法规范书写或远程服务器内部异常,都会在命令行界面打印出具体错误。

  1. 正确返回:
  2. {"taskId": 1, "taskVersion": 1}
  3. 异常返回:
  4. {"errors": "xxx"}

更新任务

  1. 用法:
  2. etl task:update taskId filepath
  3. 示例
  4. etl update 1 ./simple_test.js

如果针对同一个配置文件进行了修改操作,可以选择使用task update原地更新而不是通过task submit重复提交生成多个任务。

如上文所示,第一个参数表示要更新的目标任务Id。而目标任务Id由一开始执行了etl task:submit命令后得来。更新成功后,将会在命令行界面打印最新的taskVersion值。更新失败后,也会打印相关的异常信息。

运行任务

  1. 用法:
  2. etl task:run taskId
  3. 示例
  4. etl task:run 1

停止运行中的任务

附件-表达式引擎

所谓表达式引擎——顾名思义,是一个解析并执行表达式的程序。表达式引擎被集成在了数据治理引擎中,数据节点扩展的 map, filter, const 等方法可以接收一个表达式字符串,最后在运行任务时调用表达式引擎解析执行,比如patient_name.map("$1|toPYCode|toUpper").output("spell_code");,该语句表示应用一个接收表达式字符串的map方法,用于将patient_name转换成大写的拼音码。

SELECT to_char(now(), 'Day, DD HH12:MI:SS') FROM tbl WHERE tbl_date_col > '2020-01-01' AND tbl_int_col <= 10 AND tbl_bool_col == true;

表达式的基本书写语法形如sql语句中的关键部分,上文中加粗的部分即为常见格式。

更精确地来说,表达式是一组代码的集合,它返回一个值。每一个合法的表达式都能计算成某个值,但从概念上讲,有两种类型的表达式:有副作用的(比如赋值)和单纯计算求值的。

表达式x=7是第一类型的一个例子。该表达式使用=运算符将值7赋予变量x。这个表达式自己的值等于7。

代码3 + 4是第二个表达式类型的一个例子。该表达式使用+运算符把3和4加到一起但并没有把结果(7)赋值给一个变量。

目前表达式引擎仅支持第二类,不支持赋值表达式。

类型

数据治理引擎集成的表达式引擎支持以下数据类型,并提供了相应的数据类型显式转换函数。

类型转换函数 支持传入类型 额外说明
toInt toInt(Int), toInt(Float), toInt(Text), toInt(Timestamp), toInt(Time), toInt(Date), toInt(Numeric) 支持常量,下同
toNumeric toNumeric(Int), toNumeric(Numeric),toNumeric(Numeric,Int),toNumeric(Int,Int) 第二个Int参数表示需要保留的小数位数
toFloat toFloat(Int), toFloat(Numeric),toFloat(Float),toFloat(Text)
toBlob 暂缺
toInterval 暂缺
toTime toTime(Time), toTime(Timestamp), toTime(Text,Text) toTime(Text,Text) 第二个参数接受一个时间格式字符串,表示第一个Text的格式,比如 "mmHHss"
toDate toDate(Date), toDate(Timestamp), toDate(Text,Text) toDate(Text,Text) 第二个参数接受一个时间格式字符串,表示第一个Text的格式,比如 "yyyyMMdd"
toTimestamp toTimestamp(Timestamp), toTimestamp(Text), toTimestamp(Text,Text) toTimestamp(Text,Text) 第二个参数接受一个时间格式字符串,表示第一个Text的格式,比如 "yyyyMMdd",假如不传入,默认为RFC 3339格式 —— 2006-01-02T15:04:05Z07:00
toText toText(Text), toText(Int), toText(Numeric), toText(Float), toText(Date), toText(Date,Text), toText(Timestamp), toText(Timestamp,Text) toText(Date,Text)和 toText(Timestamp,Text)第二个参数接受一个要输出的时间格式化字符串

语法

  1. $1 > 10
  2. toText($1)
  3. multiIf($1 > 0, $1, $1 < 0, $2, $3)
  4. ...
  1. 示例
  2. const insurance_no = ctx.column("medrec.pat_master_index.insurance_no");
  3. const vip_id = ctx.column("medrec.pat_master_index.vip_id");
  4. const cardNoAlt = ctx.concat(insurance_no, vip_id);
  5. cardNoAlt
  6. .map(`multiIf($1,"YBKH", $2,"JZKH")`)
  7. .output("card_type");
  8. cardNoAlt
  9. .map("coalesce($1,$2)")
  10. .output("card_no");

表达式引擎除了支持常量表达式的简单计算外,也支持识别map, filter等方法传入的数据节点类型并进行数据治理操作。为了方便起见,表达式引擎采取了取位置索引来代表具体操作数据节点的方式。其中$1表示传入的第一个变量,当传入联合变量时(concat等节点),根据传入顺序确定位置索引。

  1. 常规
  2. toText($1)
  3. toText($1,"yyyyMMdd")
  4. toDate(toText($1,"yyyyMMdd"),"yyyyMMdd")
  5. multiIf($1,"YBKH", $2,"JZKH")
  6. colease($1, $2)
  1. 管道 |
  2. $1|toText
  3. $1|toText("yyyyMMdd")
  4. $1|toText("yyyyMMdd")|toDate("yyyyMMdd")
  5. $1|multiIf("YBKH", $2,"JZKH")
  6. $1|colease($2)

表达式引擎支持以上两种函数调用方式,应用效果完全一致(改写示例一一对应)。

在管道模式下,函数调用通过“|”符号替代括号嵌套调用,并且可以省略第一个参数的书写(默认由程序自动传入)。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注