Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

etl engine 轻量级 跨平台 流批一体ETL引擎 数据抽取-转换-装载 ETL engine lightweight cross platform batch flow integration ETL engine data extraction transformation loading

NotificationsYou must be signed in to change notification settings

hw2499/etl-engine

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 

Repository files navigation

CN docEN doc

流批一体数据交换引擎
实现从源读取数据 -> (目标数据类型转换 | 数据分发) -> 写到目标数据源
支持数据流传输过程中进行融合计算查询
支持CDC模式增量同步方案

产品概述

  • 产品由etl-engine引擎和etl-designer云端设计器及etl-crontab调度组成,
  • etl-engine引擎负责解析etl配置文件并执行etl任务,
  • etl-designer云端设计器通过拖拉拽的方式生成etl-engine引擎可识别的etl任务配置文件,
  • etl-crontab调度设计器负责按时间周期执行指定的etl任务,etl-crontab调度还提供了查询etl任务执行日志功能,
  • 三部分组成了etl解决方案,可集成到任意使用场景。

产品详细介绍

高可用介绍

资源地址

功能特性

  • 支持跨平台执行(windows,linux),只需要一个可执行文件和一个配置文件就可以运行,无需其它依赖,轻量级引擎。
  • 输入输出数据源支持influxdb v1、clickhouse、prometheus、elasticsearch、hadoop(hive,hbase)、postgresql(兼容Greenplum))、mysql(兼容Doirs和OceanBase)、oracle、sqlserver、sqlite、rocketmq、kafka、redis、excel
  • 任意一个输入节点可以同任意一个输出节点进行组合,遵循pipeline模型。
  • 支持跨多种类型数据库之间进行数据融合查询。
  • 支持消息流数据传输过程中与多种类型数据库之间的数据融合计算查询。
  • 数据融合查询语法遵循ANSI SQL标准。
  • 为满足业务场景需要,支持配置文件中使用全局变量,实现动态更新配置文件功能。
  • 任意一个输出节点都可以嵌入go语言脚本并进行解析,实现对输出数据流的格式转换功能。
  • 支持节点级二次开发,通过配置自定义节点,并在自定义节点中配置go语言脚本,可扩展实现各种功能操作。
  • 任意一个输入节点都可以通过组合数据流拷贝节点,实现从一个输入同时分支到多个输出的场景。
  • 支持将各节点执行日志输出到数据库中。
  • 支持跟crontab调度组合配置,实现周期性执行etl-engine任务。
  • 支持MySQL CDC模式数据同步,将MySQL数据库表数据的变化实时同步到其它MySQL、Oracle、PostgreSQL、Elastic等数据库.
  • 支持PostgreSQL CDC模式数据同步,将PostgreSQL数据库表数据的变化实时同步到其它MySQL、Oracle、PostgreSQL、Elastic等数据库.

数据流特性

  • 输入输出任意组合

输入输出

  • 输出节点嵌入Go语言 方便格式转换

支持嵌入脚本语言

  • 数据流复制 方便多路输出

数据流复制

  • 自定义节点嵌入Go语言 方便实现各种操作

自定义节点

  • 转换节点嵌入Go语言 方便实现各种转换

转换节点

  • 流批一体融合查询
    支持多源输入,内存计算,融合输出融合查询语法
    流批一体融合查询

  • MySQL CDC模式数据同步
    支持将Master MySQL数据库数据实时同步到其它MySQL、Oracle、PostgreSQL、Elastic等数据库
    提供数据实时同步备份、增量对比能力视频播放地址

增量同步方案

  • PostgreSQL CDC模式数据同步
    支持将Master PostgreSQL数据库数据实时同步到其它PostgreSQL、MySQL、Oracle、Elastic等数据库
    提供数据实时同步备份、增量对比能力视频播放地址

PostgreSQL增量同步方案

调度集成方案

  • etl_crontab与etl_engine灵活组合

集成方案

  • etl-designer设计器

etl-designer设计器

  • 调度设计器

调度设计器

  • 调度日志

调度日志

  • Etl日志明细

Etl日志明细

使用方式

window平台

  etl_engine.exe -fileUrl .\graph.xml -logLevel info

linux平台

    etl_engine -fileUrl .\graph.xml -logLevel info

配置文件样例

<?xml version="1.0" encoding="UTF-8"?><Graph><Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="节点1" fetchSize="5"><Script name="sqlScript"><![CDATA[select* from (select* from t3 limit 10)]]></Script></Node><Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="节点2" dbConnection="CONNECT_02" outputFields="f1;f2" renameOutputFields="c1;c2" outputTags="tag1;tag4"  renameOutputTags="tag_1;tag_4"  measurement="t1" rp="autogen"></Node><!--<Node id="DB_OUTPUT_02" type="DB_OUTPUT_TABLE" desc="节点3" dbConnection="CONNECT_03" outputFields="f1;f2;f3"  renameOutputFields="c1;c2;c3"  batchSize="1000"><Script name="sqlScript"><![CDATA[           insert into db1.t1 (c1,c2,c3) values (?,?,?)    ]]></Script></Node>  --><Line id="LINE_01" type="STANDARD" from="DB_INPUT_01" to="DB_OUTPUT_01" order="0" metadata="METADATA_01"></Line><Metadata id="METADATA_01"><Field name="c1" type="string" default="-1" nullable="false"/><Field name="c2" type="int" default="-1" nullable="false"/><Field name="tag_1" type="string" default="-1" nullable="false"/><Field name="tag_4" type="string" default="-1" nullable="false"/></Metadata><Connection id="CONNECT_01" dbURL="http://127.0.0.1:58080" database="db1" username="user1" password="******" token="" org="hw"  type="INFLUXDB_V1"/><Connection id="CONNECT_02" dbURL="http://127.0.0.1:58086" database="db1" username="user1" password="******" token="" org="hw"  type="INFLUXDB_V1"/><!--<Connection id="CONNECT_04" dbURL="127.0.0.1:19000" database="db1" username="user1" password="******" type="CLICKHOUSE"/>--><!--<Connection id="CONNECT_03" dbURL="127.0.0.1:3306" database="db1" username="user1" password="******" type="MYSQL"/>--><!--<Connection id="CONNECT_03"  database="d:/sqlite_db1.db"  batchSize="10000" type="SQLITE"/>--><!--<Connection id="CONNECT_02" dbURL="127.0.0.1:10000" database="db1" username="root" password="b" batchSize="1000" type="HIVE"/>  --><!--<Connection id="CONNECT_02" dbURL="http://127.0.0.1:9200" database="db1" username="elastic" password="******" batchSize="1000" type="ELASTIC"/>    --></Graph>

支持节点类型

任意一个读节点都可以输出到任意一个写节点

输入节点-读数据表

输出节点-写数据表

输入节点-读 excel文件

输出节点-写 excel文件

输入节点-执行数据库脚本

输出节点-垃圾桶,没有任何输出

输入节点-MQ消费者

输出节点-MQ生产者

数据流拷贝节点,位于输入节点和输出节点之间,既是输出又是输入

输入节点-读 redis

输出节点-写 redis

自定义节点,通过嵌入go脚本来实现各种操作

输入节点-执行系统脚本节点

输入节点-读取CSV文件节点

输入节点-读PROMETHEUS节点

输入节点-PROMETHEUS EXPORTER节点

输出节点-写PROMETHEUS节点

输入节点-Http节点

输入节点-读es节点

输出节点-写es节点

输入节点-增量对比节点

输入节点-读hive节点

输入节点-融合查询节点

输入节点-MySQLBinLog节点

输入节点-PG_WAL节点

输入节点-PG_WAL2JSON节点

输入节点-读Hbase节点

输入节点-写Hbase节点

组合方式

  • 任意一个输入节点都可以连接到任意一个输出节点
  • 任意一个输入节点都可以连接到一个拷贝节点
  • 一个拷贝节点可以连接到多个输出节点
  • 任意一个输入节点都可以连接到一个转换节点
  • 拷贝节点不允许连接转换节点

配置说明

节点DB_INPUT_TABLE

输入节点

属性说明
id唯一标示
type类型, DB_INPUT_TABLE
scriptsqlScript SQL语句
fetchSize每次读取记录数
dbConnection数据源ID
desc描述

支持源类型

MYSQL、Influxdb 1x、CK、PostgreSQL、Oracle、SQLServer、sqlite

样本

<Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="节点1" fetchSize="1000"><Script name="sqlScript"><![CDATA[select* from (select* from t4 limit 100000)]]></Script></Node>

节点XLS_READER

输入节点

读取EXCEL文件内容

属性说明
id唯一标示
type类型, XLS_READER
fileURL文件路径+文件名称
startRow从第几行开始读取,索引从0开始,0代表第1行(通常是列标题)
sheetName表名称
maxRow最多读几行,*代表全部,10代表读取10行
fieldMap字段映射关系,格式:field1=A;field2=B;field3=C
字段名称=第几列 多个字段之间用分号分隔

样本

<Node id="XLS_READER_01"   type="XLS_READER" desc="输入节点1"  fileURL="d:/demo/test1.xlsx" startRow="2" sheetName="人员信息" fieldMap="field1=A;field2=B;field3=C"></Node>

节点DB_OUTPUT_TABLE

输出节点

属性说明适合
id唯一标示
type类型, DB_OUTPUT_TABLE
scriptinsert、delete、update SQL语句ck,mysql,sqlite,postgre,oracle,sqlserver
batchSize每次批提交的记录数ck,mysql,sqlite,postgre,oracle
注意influx以输入时的fetchSize为批提交的大小
outputFields输入节点读数据时传递过来的字段名称influx,ck,mysql,sqlite,postgre,oracle,sqlserver
renameOutputFields输出节点到目标数据源的字段名称influx,ck,mysql,sqlite,postgre,oracle,sqlserver
dbConnection数据源ID
desc描述
outputTags输入节点读数据时传递过来的标签名称influx
renameOutputTags输出节点到目标数据源的标签名称influx
rp保留策略名称influx
measurement表名称influx
timeOffset时间抖动偏移量,用于批量写入时生成不可重复的时间戳
(该功能通过time.Sleep实现,建议通过嵌入脚本增加一个纳秒格式的time列,或调整你的time+tags)
influx

支持目标类型

MYSQL、Influxdb 1x、CK、PostgreSQL、Oracle、SQLServer、sqlite

样本

<Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="写influx节点1" dbConnection="CONNECT_02" outputFields="f1;f2;f3;f4"  renameOutputFields="c1;c2;c3;c4"  outputTags="tag1;tag2;tag3;tag4"  renameOutputTags="tag_1;tag_2;tag_3;tag_4" measurement="t5" rp="autogen"></Node><Node id="DB_OUTPUT_02" type="DB_OUTPUT_TABLE" desc="写mysql节点2" dbConnection="CONNECT_03" outputFields="time;f1;f2;f3;f4;tag1;tag2;tag3;tag4"  renameOutputFields="time;c1;c2;c3;c4;tag_1;tag_2;tag_3;tag_4" batchSize="1000"><Script name="sqlScript"><![CDATA[          insert into db1.t1 (time,c1,c2,c3,c4,tag_1,tag_2,tag_3,tag_4) values (?,?,?,?,?,?,?,?,?)    ]]></Script></Node>

节点XLS_WRITER

输出节点

写入EXCEL文件内容

属性说明
id唯一标示
typeXLS_WRITER
fileURL文件路径+文件名称
startRow从第几行开始写入 如:数字2代表是第3行(索引从0开始,0代表第1行,1代表第2行) 开始写数据
sheetName表名称
outputFields输入节点传递过来的字段名称,
格式:field1;field2;field3
renameOutputFields字段映射关系,格式:指标=B;年度=C;地区=D
字段名称=第几列 多个字段之间用分号分隔
metadataRow输出EXCEL文件中第几行输出字段名称,如:数字1代表是第1行 开始写字段名称
appendRowtrue代表追加记录模式,false代表覆盖模式。

样本

<Node id="XLS_WRITER_01"   type="XLS_WRITER" desc="输出节点2" appendRow="true"  fileURL="d:/demo/test2.xlsx" startRow="3" metadataRow="2" sheetName="人员信息" outputFields="c1;c3;tag_1"  renameOutputFields="指标=B;年度=C;地区=D"></Node>

节点DB_EXECUTE_TABLE

输入节点

执行insert ,delete ,update语句

属性说明适合
id唯一标示
typeDB_EXECUTE_TABLE
roolback是否回滚false不回滚,true回滚
sqlScriptdelete、update语句,多条语句之间用分号分隔mysql,sqlserver,sqlite,postgre,oracle,ck(不支持delete,update)
fileURL外部文件fileURL优先级别高于sqlScript,两个只能用一个

样本

<Node id="DB_EXECUTE_01" dbConnection="CONNECT_01" type="DB_EXECUTE_TABLE" desc="节点1" rollback="false"><Script name="sqlScript"><![CDATA[         insert into t_1 (uuid,name) values (13,'aaa');         insert into t_1 (uuid,name) values (14,'bbb');         insert into t_1 (uuid,name) values (15,'ccc');         insert into t_1 (uuid,name) values (1,'aaa')]]></Script>

节点OUTPUT_TRASH

输出节点

空管道,没有任何输出,适用于作为没有任何输出的节点所连接的目标节点(比如:DB_EXECUTE_TABLE节点)

样本

<Node id="OUTPUT_TRASH_01"   type="OUTPUT_TRASH" desc="节点2"></Node>

节点MQ_CONSUMER

输入节点,阻塞模式

mq消费者 (支持rocketmq)

属性说明适合
id唯一标示
typeMQ_CONSUMER
flag默认值:ROCKETMQ支持rocketmq
nameServermq服务器地址,格式:127.0.0.1:8080
groupmq组名称
topic订阅主题名称
tag标签名称,格式:*代表消费全部标签,
tag_1代表只消费tag_1标签

样本

<Node id="MQ_CONSUMER_02" type="MQ_CONSUMER" flag="ROCKETMQ" nameServer="127.0.0.1:8080" group="group_1" topic="out_event_user_info" tag="*"></Node>

mq消费者 (支持kafka)

属性说明适合
id唯一标示
typeMQ_CONSUMER
flag默认值:KAFKA支持kafka
nameServermq服务器地址,格式:127.0.0.1:8080
groupmq组名称
topic订阅主题名称
listenerFlag1是按分区进行监听 ; 2是按单通道进行监听,topic可以是多个
allPartition是否消费所有分区,true是消费所有分区
partition当allPartition为false时,partition代表消费的分区数字,可按指定分区数字进行消费,适合分流场景
saslUserName用于SASL认证
saslPassword用于SASL认证

样本

<Node id="MQ_CONSUMER_03" type="MQ_CONSUMER" flag="KAFKA" nameServer="127.0.0.1:18081" group="group_10" topic="out_event_user_info" listenerFlag="2"></Node>

节点MQ_PRODUCER

输出节点

mq生产者 (支持rocketmq)

属性说明适合
id唯一标示
typeMQ_PRODUCER
flag默认值:ROCKETMQ支持rocketmq
nameServermq服务器地址,格式:127.0.0.1:8080
groupmq组名称
topic订阅主题名称
tag标签名称,格式:tag_1
sendFlag发送模式,1是同步;2是异步;3是单向
outputFields输入节点传递过来的字段名称,
格式:field1;field2;field3 多个字段之间用分号分隔
renameOutputFields字段映射关系,格式:field1;field2;field3 多个字段之间用分号分隔

样本

<Node id="MQ_PRODUCER_01" type="MQ_PRODUCER" flag="ROCKETMQ" nameServer="127.0.0.1:8080" group="group_11" topic="out_event_system_user" tag="tag_1"          sendFlag="3" outputFields="time;tag_1;c2"  renameOutputFields="时间;设备;指标"></Node>

mq生产者 (支持kafka)

属性说明适合
id唯一标示
typeMQ_PRODUCER
flag默认值:KAFKA支持kafka
nameServermq服务器地址,格式:127.0.0.1:8080
topic订阅主题名称
isPartitiontrue代表指定分区发消息;false代表随机分区发消息
sendFlag发送模式,1是同步;2是异步
outputFields输入节点传递过来的字段名称,
格式:field1;field2;field3 多个字段之间用分号分隔
renameOutputFields字段映射关系,格式:field1;field2;field3 多个字段之间用分号分隔

样本

<Node id="MQ_PRODUCER_02" type="MQ_PRODUCER" flag="KAFKA" nameServer="127.0.0.1:18081"  topic="out_event_system_user"          sendFlag="1" outputFields="Offset;Partition;Topic;Value"  renameOutputFields="Offset;Partition;Topic;Value"></Node>

数据流拷贝节点

将一个输入节点的数据流输出到多个分支输出节点

属性说明适合
id唯一标示
typeCOPY_STREAM

样本

<Node id="COPY_STREAM_01" type="COPY_STREAM" desc="数据流拷贝节点" ></Node><Line id="LINE_01" type="STANDARD" from="DB_INPUT_01" to="COPY_STREAM_01" order="1" metadata="METADATA_01" ></Line><Line id="LINE_02" type="COPY" from="COPY_STREAM_01:0" to="DB_OUTPUT_01" order="2" metadata="METADATA_01"></Line><Line id="LINE_03" type="COPY" from="COPY_STREAM_01:1" to="DB_OUTPUT_02" order="2" metadata="METADATA_02"></Line>

节点REDIS_READER

输入节点

属性说明适合
id唯一标示
typeREDIS_READER
nameServer127.0.0.1:6379
password******
db0数据库ID
isGetTTLtrue或false 是否读取ttl信息
keys读取的KEY,多个KEY之间用分号分隔目前只支持读取string,int,float类型内容
patternMatchKey默认是false,当设置为true时,证明keys中的内容是进行模式匹配的字符串,如keys=HW_*代表以HW_为前缀的所有key设置为true时,默认生成key;value;key_ttl三个固定键,用于后续读取

样本

<Node id="REDIS_READER_01"   type="REDIS_READER" desc="输入节点1"   nameServer="127.0.0.1:6379" password="******" db="0" isGetTTL="true" keys="a1;a_1" ></Node>
<Node id="REDIS_READER_01"   type="REDIS_READER" desc="输入节点1" nameServer="127.0.0.1:6379" password="******" db="0"  isGetTTL="true" patternMatchKey="true" keys="HW_*"></Node>

节点REDIS_WRITER

输出节点,通过配置patternMatchKey可实现将记录集中的某两个字段写入redis

属性说明适合
id唯一标示
typeREDIS_WRITER
nameServer127.0.0.1:6379
password******
db0数据库ID
isGetTTLtrue或false 是否写入ttl信息
patternMatchKey默认是false,当设置为true时,证明key是从renameOutputFields中指定的key中取键,从renameOutputFields中指定的value中取值适合将记录集中的某两个字段写入redis
outputFields目前只支持写string,int,float类型内容
renameOutputFields目前只支持写string,int,float类型内容

样本

<Node id="REDIS_WRITER_01"   type="REDIS_WRITER" desc="输出节点1"  nameServer="127.0.0.1:6379" password="******" db="1"   isGetTTL="true" outputFields="a1;a_1"  renameOutputFields="f1;f2"  ></Node>
<Node id="REDIS_WRITER_01"   type="REDIS_WRITER" desc="输出节点1"  nameServer="127.0.0.1:6379" password="******" db="1"   isGetTTL="true"   patternMatchKey="true"  outputFields="key;value;key_ttl"  renameOutputFields="key;value;key_ttl" ></Node>

节点CUSTOM_READER_WRITER

自定义节点,可以通过嵌入GO脚本实现各种操作

属性说明适合
id唯一标示
typeCUSTOM_READER_WRITER

节点EXECUTE_SHELL

输入节点-执行系统脚本节点

属性说明适合
id唯一标示
typeEXECUTE_SHELL
fileURL外部脚本文件位置fileURL与Script两者只能出现一个,同时出现时fileURL优先于Script
Script脚本内容
outLogFileURL控制台输出内容到指定的日志文件

样本

<Node id="EXECUTE_SHELL_01"  type="EXECUTE_SHELL" desc="节点1"  _fileURL="d:/test1.bat" outLogFileURL="d:/test1_log.txt"><Script><![CDATA[    c:    dir/w]]></Script></Node>

节点CSV_READER

输入节点-读取CSV文件节点

属性说明适合
id唯一标示
typeCSV_READER
fileURLCSV文件位置
fetchSize每次读取到内存中的批量数如:可配合influxdb中每次批量提交的记录数,曾测试1W多条数据123个字段,配置100,入库时间为15秒
startRow从第几行开始读数据,默认0代表第1行一般0是第一行列名称
fields定义输出的字段名称,多个字段间用分号分隔field1;field2;field3
fieldsIndex定义输出的列,默认0代表第1列,多个字段间用分号分隔;配置成-1代表按顺序读取所有字段"2;3;4"

样本

<Node id="CSV_READER_01"   type="CSV_READER" desc="输入节点1" fetchSize="5"  fileURL="d:/demo2.csv" startRow="1" fields="field1;field2;field3"  fieldsIndex="0;3;4"></Node>

PROMETHEUS_API_READER

输入节点-读PROMETHEUS节点

属性说明适合
id唯一标示
typePROMETHEUS_API_READER
urlprometheus服务器地址如:http://127.0.0.1:9090
Script查询API内容,只支持/api/v1/query
和 /api/v1/query_range
如:/api/v1/query?query=my_device_info{deviceCode="设备编码000"}[1d]

注意:查询返回的结果集中,__name__是度量名称;__TIME__是prometheus入库时的时间戳;__VALUE__是prometheus的value

<Node id="PROMETHEUS_API_READER_1" type="PROMETHEUS_API_READER"  url="http://127.0.0.1:9090"><Script name="sqlScript"><![CDATA[            /api/v1/query?query=my_device_info{deviceCode="设备编码000"}[1d]            ]]></Script></Node><Node id="DB_OUTPUT_TABLE_1" type="DB_OUTPUT_TABLE" batchSize="10" dbConnection="CONNECT_1" desc="" outputFields="__name__;address;deviceCode;__TIME__;__VALUE__" renameOutputFields="f1;f2;f3;f4;f5"><Script name="sqlScript"><![CDATA[insert into t_prome_info_bk(f1,f2,f3,f4,f5)values (?,?,?,?,?)]]></Script>

PROMETHEUS_EXPORTER

输入节点-PROMETHEUS EXPORTER节点

属性说明适合
id唯一标示
typePROMETHEUS_EXPORTER
exporterAddrexporter地址, IP:PORT如: :10000
exporterMetricsPathexporter路径,如:/EtlEngineExport
metricName度量名称如:Etl_Engine_Exporter
metricHelp度量描述如:样本
labels标签名称如:deviceCode;address;desc
<Node id="PROMETHEUS_EXPORTER_1" type="PROMETHEUS_EXPORTER"      exporterAddr=":10000" exporterMetricsPath="/EtlEngineExport"     metricName="Etl_Engine_Exporter" metricHelp="Etl_Engine_Exporter样本"     labels="deviceCode;address;desc"></Node>

prometheus配置文件增加如下内容:

  - job_name:"etlengine_exporter"    metrics_path:"/EtlEngineExport"     static_configs:      - targets: ["127.0.0.1:10000"]

同时会暴露一个服务地址/pushDataService用于生成数据,postman调试细节如下:

  POST 方式 URL  http://127.0.0.1:10000/pushDataService ,  Body x-www-form-urlencoded 参数:"jsondata":{"labels":{"deviceCode":"设备编码001","address":"南关区","desc":"最大值"},"value":100}

输出的数据流中会自动添加两个字段,__name__是度量名称,__VALUE__是prometheus的value,

PROMETHEUS_API_WRITER

输出节点-写PROMETHEUS节点

属性说明适合
id唯一标示
typePROMETHEUS_API_WRITER
urlprometheus服务器地址如:http://127.0.0.1:9090
metric度量名称
outputFields输入节点传递过来的字段名称
renameOutputFieldsprometheus入库时对应的标签名称,数据项与outputFields各项对应
valueFieldprometheus入库时对应的value,数据项与输入节点中已存在的字段名称对应
<Node id="DB_INPUT_TABLE_1" type="DB_INPUT_TABLE" fetchSize="1000" dbConnection="CONNECT_1"><Script name="sqlScript"><![CDATA[select f2,f3,f4 from t_prome_info ]]></Script></Node><Node id="PROMETHEUS_API_WRITER_1" type="PROMETHEUS_API_WRITER" url="http://127.0.0.1:9090" metric="my_device_info" outputFields="f2;f3" renameOutputFields="deviceCode;address" valueField="f4"></Node>

HTTP_INPUT_SERVICE

输入节点-Http节点,阻塞模式

属性说明适合
id唯一标示
typeHTTP_INPUT_SERVICE
serviceIp绑定HTTP/HTTPS服务的IP
servicePort绑定HTTP/HTTPS服务的端口
serviceName对外暴露的服务名称默认:etlEngineService
serviceCertFileHTTPS服务证书文件位置
serviceKeyFileHTTPS服务秘钥文件位置
<Node id=""type="HTTP_INPUT_SERVICE"serviceIp=""servicePort="8081"serviceName="etlEngineService"serviceCertFile=""serviceKeyFile=""></Node>postman调试:         http://127.0.0.1:8081/etlEngineService POST 方式,URL: /etlEngineService , Body:x-www-form-urlencoded 参数:"jsondata":{"rows":[{"deviceCode":"设备编码001","address":"朝阳区","desc":"最大值","value":20},{"deviceCode":"设备编码002","address":"朝阳区","desc":"最大值","value":18}]}     注意:必须传递KEY为rows的数组结构

ELASTIC_READER

输入节点-读es节点

属性说明适合
id唯一标示
typeELASTIC_READER
index索引名称
sourceFields结果集中输出的字段名称
fetchSize每次读取记录数
Script标签DSL查询语法

样本

<Node id="ELASTIC_READER_01" dbConnection="CONNECT_02"   type="ELASTIC_READER" desc="节点2"  sourceFields="custom_type;username;desc;address" fetchSize="2"><Script name="sqlScript"><![CDATA[            {"query": {"bool":{"must":[                              //{                                //"term": {"username.keyword":"王先生"  }                             //"match": {"username":""  }                             //  },                               {"term":   {"custom_type":"t_user_info"  }                               }                          ]                                   }                    }                }        ]]></Script></Node><Connection id="CONNECT_02" type="ELASTIC" dbURL="http://127.0.0.1:9200" database="es_db3" username="elastic" password="******" />

ELASTIC_WRITER

输出节点-写es节点

属性说明适合
id唯一标示
typeELASTIC_WRITER
index索引名称
idType主键输出方式: 1 代表不指定id,由es系统自己生成20位GUID;
2 代表由idExpress中指定输出的字段名称,从上一个节点的renameOutputFields中进行匹配相同的字段名称的值;
3 代表由idExpress中指定表达式配置,_HW_UUID32 表达式 代表按32位UUID自动生成主键
idExpress如:idType配置3;该参数配置 _HW_UUID32
outputFields输入节点读数据时传递过来的字段名称按顺序输出字段内容,不按字段名称
renameOutputFields输出节点到目标数据源的字段名称按顺序输出字段内容,不按字段名称

样本

<Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="节点1" fetchSize="2"><Script name="sqlScript"><![CDATA[        SELECT"t_user_info" AS custom_type,uname, udesc,uaddress,uid FROM t_u_info]]></Script></Node><Node id="ELASTIC_WRITER_01" dbConnection="CONNECT_02"  type="ELASTIC_WRITER" desc="节点2"          outputFields="custom_type;uname;udesc;uaddress;uid"         renameOutputFields="custom_type;username;desc;address;uid"        idType="3"         idExpress="_HW_UUID32"></Node><Line id="LINE_01" type="STANDARD" from="DB_INPUT_01" to="ELASTIC_WRITER_01" order="0" metadata="METADATA_01"></Line><Metadata id="METADATA_01"><Field name="custom_type" type="string" default="-1" nullable="false"/><Field name="username" type="string" default="-1" nullable="false"/><Field name="desc" type="string" default="-1" nullable="false"/><Field name="address" type="string" default="-1" nullable="false"/><Field name="uid" type="string" default="-1" nullable="false"/></Metadata><Connection id="CONNECT_02" type="ELASTIC" dbURL="http://127.0.0.1:9200" database="es_db3" username="elastic" password="******" />

HIVE_READER

输入节点-读hive节点

认证方式

hive.server2.authentication = NONE
hive.server2.authentication = KERBEROS

属性说明
id唯一标示
type类型, HIVE_READER
scriptsqlScript SQL语句
fetchSize每次读取记录数
dbConnection数据源ID
authFlag认证类型:NONE 或 KERBEROS ,默认是NONE
krb5Principalkerberos用户名称,如:hive.server2.authentication.kerberos.principal = hive/_HOST@EXAMPLE.COM 中的hive
desc描述

样本

<Node id="HIVE_READER_01" dbConnection="CONNECT_01"   type="HIVE_READER" desc="节点1"  fetchSize="100"><Script name="sqlScript"><![CDATA[select* from db1.t_u_info  ]]></Script></Node><Connection id="CONNECT_01"           dbURL="127.0.0.1:10000" database="db1"           username="root" password="******"           batchSize="1000" type="HIVE"/>

INCREMENT

输入节点-增量对比节点

属性说明适合
id唯一标示
typeINCREMENT
inputSourceConnection增量对比源表数据连接ID
inputTargetConnection增量对比目标表数据连接ID
inputSourceSQL增量对比源表查询语句
inputTargetSQL增量对比目标表查询语句
inputSourceFetchSize增量对比源表每次读取记录数大小,-1为一次性全部读取完毕,注意内存是否够用。
inputSourcePrimaryKey增量对比源表主键字段(参与对比),多字段用分号分隔
inputSourceCompareKey增量对比源表其它字段(参与对比),多字段用分号分隔
inputSourceMappingKey增量对比源表其它字段(不参与对比),多字段用分号分隔
inputTargetPrimaryKey增量对比目标表主键字段(参与对比),多字段用分号分隔
inputTargetCompareKey增量对比目标表其它字段(参与对比),多字段用分号分隔
inputTargetMappingKey增量对比目标表其它字段(不参与对比),多字段用分号分隔
outputInsertConnection增量对比输出新增数据目标表数据连接ID
outputUpdateConnection增量对比输出更新数据目标表数据连接ID
outputDeleteConnection增量对比输出删除数据目标表数据连接ID
outputInsertMetadata增量对比输出新增数据目标表元数据ID
outputUpdateMetadata增量对比输出更新数据目标表元数据ID
outputDeleteMetadata增量对比输出删除数据目标表元数据ID
outputInsertSQL增量对比输出新增数据insert语句
outputUpdateSQL增量对比输出更新数据update语句
outputDeleteSQL增量对比输出删除数据delete语句
outputInsertFields增量对比输出新增数据字段名称,注意顺序要和insert语句中的字段点位符顺序保持一致,多字段用分号分隔
outputUpdateFields增量对比输出更新数据字段名称,注意顺序要和update语句中的字段点位符顺序保持一致,多字段用分号分隔
outputDeleteFields增量对比输出删除数据字段名称,注意顺序要和delete语句中的字段点位符顺序保持一致,多字段用分号分隔
outputToCopyStream默认为false,为false,则增量变化数据在当前节点直接入库(以OUTPUT_TRASH节点结束),
为true,则增量变化数据不在当前节点入库,数据流向后续节点(只能流向 COPY_STREAM ),
由下面的节点来操作数据走向。
mustConvertMetadata默认为true代表必须按Metadata配置进行格式转换,写效率低但数据质量高,
为false是不按Metadata配置进行格式转换(前提是数据质量比较高),写效率高但数据质量低(有可能在入库过程中因有脏数据导致报错失败)

样本

<?xml version="1.0" encoding="UTF-8"?><Graph><Node id="INCREMENT_01" type="INCREMENT"   inputSourceConnection="CONNECT_1"inputTargetConnection="CONNECT_1"inputSourceSQL="select * from t_s order by f1 asc"inputTargetSQL="select * from t_t order by c1 asc"outputInsertConnection="CONNECT_1"outputUpdateConnection="CONNECT_1"outputDeleteConnection="CONNECT_1"outputInsertSQL="insert into t_t (c1,c2,c3,c4) values (?,?,?,?)"outputUpdateSQL="update t_t set c3=? ,c4=?,c2=? where c1=?"outputDeleteSQL="delete from t_t where c1=?"outputInsertFields="c1;c2;c3;c4"outputUpdateFields="c3;c4;c2;c1"outputDeleteFields="c1"outputInsertMetadata="METADATA_2"outputUpdateMetadata="METADATA_2"outputDeleteMetadata="METADATA_2"inputSourcePrimaryKey="f1"inputSourceCompareKey="f2;f3"inputSourceMappingKey="f4"inputSourceFetchSize="1000"inputTargetPrimaryKey="c1"inputTargetCompareKey="c2;c3"inputTargetMappingKey="c4"></Node><Node id="OUTPUT_TRASH_01"   type="OUTPUT_TRASH" desc="垃圾桶节点1"></Node><Line id="LINE_01" type="STANDARD" from="INCREMENT_01" to="OUTPUT_TRASH_01" order="1" metadata="METADATA_2"></Line><Metadata id="METADATA_1" sortId="1"><Field name="f1" type="int" default="" nullable="true"/><Field name="f2" type="string" default="" nullable="true"/><Field name="f3" type="float" default="" nullable="true"/><Field name="f4" type="string" default="" nullable="true"/></Metadata><Metadata id="METADATA_2" sortId="1"><Field name="c1" type="string" default="" nullable="true"/><Field name="c2" type="string" default="" nullable="true"/><Field name="c3" type="float" default="" nullable="true"/><Field name="c4" type="string" default="" nullable="true"/></Metadata><Connection sortId="1" id="CONNECT_1" type="MYSQL" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" token="" org=""/></Graph>

FEDERATION_READER

输入节点-融合查询节点

属性说明
id唯一标示
type类型, FEDERATION_READER
factTableConnectionId读取事实表数据源连接ID,只能指定读取一个数据源,
factTableConnectionId和dimensionTableConnectionIds可同时设置也可单独设置
factTableQueryFetchSize每次读取事实表记录数 ,-1代表一次性全部读取完毕
factTableQuery读取事实表sql语句,对应 factTableConnectionId
dimensionTableConnectionIds读取维表数据源连接ID,多个数据源用分号分隔,
factTableConnectionId和dimensionTableConnectionIds可同时设置也可单独设置
dimensionTableQuery读取维表对应的sql语句, 多条语句用分号分隔,即对应dimensionTableConnectionIds中所指定的多个数据源连接ID所使用的SQL
federationTableAliasName融合查询中的表别名,多表之间用分号分隔,注意第一个元素是指定事实表别名,第二个元素及之后是指定维表别名,顺序及数组总数是 factTableConnectionId + dimensionTableConnectionIds
或factTableConnectionId或dimensionTableConnectionIds
federationQuery融合计算查询语句,支持ANSI SQL标准
desc描述

样本

读多种类型数据库表(维表和事实表),根据各业务表id进行关联查询,最终将关联结果写入目标表(或文件)
从不同数据源读取用户表t_user_info、 产品表t_product_info 、订单表t_order_info,并在内存中融合计算查询出所有用户的订单信息

<?xml version="1.0" encoding="UTF-8"?><Graph desc="融合查询1"><Node id="FEDERATION_READER_01" type="FEDERATION_READER"   factTableConnectionId="CONNECT_01"factTableQueryFetchSize="100"factTableQuery="select o_id,u_id,p_id,o_price,o_number,o_money,o_writetime from t_order_info ORDER BY  CAST(SUBSTRING(o_id,3) AS UNSIGNED)  ASC"dimensionTableConnectionIds="CONNECT_02;CONNECT_03"dimensionTableQuery="select u_id,u_name,u_phone from t_user_info ORDER BY  CAST(SUBSTRING(u_id,3) AS UNSIGNED)  ASC ;select p_id,p_name,p_contacts,p_desc from t_product_info ORDER BY  CAST(SUBSTRING(p_id,3) AS UNSIGNED)  ASC"federationTableAliasName="t_o;t_u;t_p"federationQuery="SELECT t_u.u_id,t_u.u_name,t_u.u_phone,t_o.o_id,t_o.o_price,t_o.o_number,t_o.o_money,t_o.o_writetime,t_p.p_id,t_p.p_name,t_p.p_contacts,t_p.p_desc  FROM  t_u inner JOIN  t_o  ON t_o.u_id=t_u.u_id inner JOIN  t_p ON t_o.p_id=t_p.p_id ORDER BY  INTEGER(SUBSTRING(t_u.u_id,3))  ASC"></Node><Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="写节点1" dbConnection="CONNECT_04" outputFields="u_id;u_name;u_phone;o_id;o_price;o_number;o_money;o_writetime;p_id;p_name;p_contacts;p_desc"  renameOutputFields="u_id;u_name;u_phone;o_id;o_price;o_number;o_money;o_writetime;p_id;p_name;p_contacts;p_desc"  batchSize="1000"><Script name="sqlScript"><![CDATA[           insert into db1.t_u_o_p_info (u_id,u_name,u_phone,o_id,o_price,o_number,o_money,o_writetime,p_id,p_name,p_contacts,p_desc) values (?,?,?,?,?,?,?,?,?,?,?,?)    ]]></Script></Node><Line id="LINE_01" type="STANDARD" from="FEDERATION_READER_01" to="DB_OUTPUT_01" order="1" metadata="METADATA_1"></Line><Metadata id="METADATA_1" sortId="1"><Field name="u_id" type="string" default="" nullable="true"/><Field name="u_name" type="string" default="" nullable="true"/><Field name="u_phone" type="string" default="" nullable="true"/><Field name="o_id" type="string" default="" nullable="true"/><Field name="o_price" type="float" default="0" nullable="false"/><Field name="o_number" type="int" default="0" nullable="false"/><Field name="o_money" type="float" default="0" nullable="false"/><Field name="o_writetime" type="string" default="" nullable="true"/><Field name="p_id" type="string" default="" nullable="true"/><Field name="p_name" type="string" default="" nullable="true"/><Field name="p_contacts" type="string" default="" nullable="true"/><Field name="p_desc" type="string" default="" nullable="true"/></Metadata><Connection  id="CONNECT_01" type="MYSQL" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" token="" org=""/><Connection  id="CONNECT_02" type="MYSQL" dbURL="127.0.0.1:3307" database="db1" username="root" password="******" token="" org=""/><Connection  id="CONNECT_03" type="MYSQL" dbURL="127.0.0.1:3308" database="db1" username="root" password="******" token="" org=""/><Connection  id="CONNECT_04" type="MYSQL" dbURL="127.0.0.1:3309" database="db1" username="root" password="******" token="" org=""/></Graph>

MYSQL_BINLOG

输入节点-MySQLBinLog节点,阻塞模式

属性说明
id唯一标示
type类型, MYSQL_BINLOG
masterAddressMySQL数据库地址:127.0.0.1:3306
masterUserNameMySQL数据库登录用户名称
masterPasswordMySQL数据库登录用户密码
masterCanalIncludeTableRegex监听的源表名称,可写正式表达式,多个表可用;分隔,格式:db1.t_1
masterCanalExcludeTableRegex排除监听的源表名称,与masterCanalIncludeTableRegex是互斥关系
slaveOutputConnectionId监听到数据变化后,输出的目标数据源ID
slaveOutputMetaDataId监听到数据变化后,输出的目标元数据ID,可设置输出数据的格式
outputToCopyStreamfalse时代表在本节点直接将源表变化的数据入库到目标表
masterExtInfo默认无需配置,用于多实例观测服务心跳使用

样本


将MySQL数据库中的数据变化输出到其它MySQL、Oracel、PostgreSQL、Elastic等目标数据库中

  • MySQL配置文件,配置事项
server-id = 123 log-bin    = /opt/mysql/data/mysql-binbinlog_format = ROW
  • etl engine任务配置
<?xml version="1.0" encoding="UTF-8"?><Graph runMode="2" desc="读mysqlbin写db"><Node id="MYSQL_BINLOG_01"   type="MYSQL_BINLOG" desc="MYSQL_BINLOG输入节点1"      masterAddress="127.0.0.1:3306"      masterUserName="root"      masterPassword="******"      masterCanalIncludeTableRegex="db1.t_order_info"      masterCanalExcludeTableRegex=""      slaveOutputConnectionId="CONNECT_02"      slaveOutputMetaDataId="METADATA_01"      outputToCopyStream="false"></Node><Node id="OUTPUT_TRASH_01" type="OUTPUT_TRASH" desc="输出1"></Node><Line id="LINE_01" type="STANDARD" from="MYSQL_BINLOG_01" to="OUTPUT_TRASH_01" order="0" metadata="METADATA_01"></Line><Metadata id="METADATA_01"><Field name="hw_u1.t_1.c1" type="int" default="0" nullable="false"/><Field name="hw_u1.t_1.c2" type="string" default="" nullable="false"/><Field name="hw_u1.t_222.c3" type="decimal" default="0" nullable="false" dataFormat="2" /><Field name="hw_u1.t_222.writetime" type="datetime" default="" nullable="false" dataFormat="YYYY-MM-DD hh:mm:ss"/><Field name="hw_u1.t_order_info.o_writetime" type="datetime" default="" nullable="false" dataFormat="YYYY-MM-DD hh:mm:ss"/></Metadata><Connection id="CONNECT_01" dbURL="127.0.0.1:3306" database="db2" username="root" password="******" batchSize="10000" type="MYSQL"/><Connection id="CONNECT_02" type="ORACLE" dbURL="127.0.0.1:1521" database="orcl" username="hw_u1" password="******" token="" org=""/><Connection id="CONNECT_03"  dbURL="127.0.0.1:5432" database="postgres" username="hw_u1" password="******"  batchSize="1000" type="POSTGRES"/></Graph>

PG_WAL

输入节点-PG_WAL节点,阻塞模式

属性说明
id唯一标示
type类型, PG_WAL
masterAddressPostgreSQL数据库地址:127.0.0.1:3306
masterUserNamePostgreSQL数据库登录用户名称
masterPasswordPostgreSQL数据库登录用户密码
masterDataBasePostgreSQL数据库名称
masterPublicationTables监听的源表名称,多个表可用;分隔,格式:hw_u1.t_1;hw_u2.t_2
slaveOutputConnectionId监听到数据变化后,输出的目标数据源ID
slaveOutputMetaDataId监听到数据变化后,输出的目标元数据ID,可设置输出数据的格式
outputToCopyStreamfalse时代表在本节点直接将源表变化的数据入库到目标表
masterExtInfo默认无需配置,用于多实例观测服务心跳使用

样本


将PostgreSQL数据库中的数据变化输出到其它MySQL、Oracel、PostgreSQL、Elastic等目标数据库中。

  • postgresql.conf配置文件,配置事项
   wal_level = logical   max_replication_slots = 20   max_wal_senders = 20
  • pg_hba.conf配置文件,配置事项
host    all         all              0.0.0.0/0               md5
  • 监听的数据表均需要执行以下操作,如表hw_u1.t_4
ALTER TABLE hw_u1.t_4  REPLICA IDENTITY  FULL;
  • etl engine任务配置
<?xml version="1.0" encoding="UTF-8"?><Graph runMode="2" desc="读pgWal写db"><Node id="PG_WAL_01"   type="PG_WAL" desc="PG_WAL输入节点1"      masterAddress="127.0.0.1:5432"          masterDataBase="postgres"      masterUserName="hw_u1"      masterPassword="******"      masterPublicationTables="hw_u1.t_1"          slaveOutputConnectionId="CONNECT_01"      slaveOutputMetaDataId="METADATA_01"      outputToCopyStream="false"></Node><Node id="OUTPUT_TRASH_01" type="OUTPUT_TRASH" desc="输出1"></Node><Line id="LINE_01" type="STANDARD" from="PG_WAL_01" to="OUTPUT_TRASH_01" order="0" metadata="METADATA_01"></Line><Metadata id="METADATA_01"><Field name="hw_u1.t_1.c1" type="int" default="0" nullable="false"/></Metadata><Connection id="CONNECT_01" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" batchSize="10000" type="MYSQL"/><Connection  id="CONNECT_02" type="ORACLE" dbURL="127.0.0.1:1521" database="orcl" username="hw_u1" password="******" token="" org=""/><Connection id="CONNECT_03"  dbURL="127.0.0.1:5432" database="postgres" username="hw_u1" password="******"  batchSize="1000" type="POSTGRES"/>

PG_WAL2JSON

输入节点-PG_WAL2JSON节点,阻塞模式

属性说明
id唯一标示
type类型, PG_WAL
masterAddressPostgreSQL数据库地址:127.0.0.1:3306
masterUserNamePostgreSQL数据库登录用户名称
masterPasswordPostgreSQL数据库登录用户密码
masterDataBasePostgreSQL数据库名称
masterAddTables监听的源表名称,多个表可用;分隔,格式:hw_u1.t_1;hw_u2.t_2
masterFilterTables排除监听的源表名称,多个表可用;分隔,格式:hw_u1.t_1;hw_u2.t_2 ,与masterAddTables是互斥关系
slaveOutputConnectionId监听到数据变化后,输出的目标数据源ID
slaveOutputMetaDataId监听到数据变化后,输出的目标元数据ID,可设置输出数据的格式
outputToCopyStreamfalse时代表在本节点直接将源表变化的数据入库到目标表
masterExtInfo默认无需配置,用于多实例观测服务心跳使用

样本


将PostgreSQL数据库中的数据变化输出到其它MySQL、Oracel、PostgreSQL、Elastic等目标数据库中。

wal2json解码器,PostgreSQL需要安装wal2json.so插件 ,配置参考 PG_WAL 节点

HBASE_READER

输入节点-读Hbase节点
通过thrift2服务接口访问,默认会输出每行的_rk_ 代表行键。

属性说明
id唯一标示
type类型, HBASE_READER
fetchSize每次读取记录数,根据服务端配置决定,建议1000
tableName表名称,格式遵循 命名空间:表名称 或 表名称
limitRow读取结果集最大上限,0为不限制
maxVersions读取结果集所属版本号,根据情况配置,建议越大越好
cols读取字段名称,格式遵循 列簇:列标示符 或 列簇,多个列之间用分号分隔,如:info:name;info:age;info:height;data
Scriptfilter过滤器表达式,如:SingleColumnValueFilter('info','age',>=,'binary:24')

样本

<Node id="HBASE_READER_01" dbConnection="CONNECT_01" type="HBASE_READER" desc="读hbase节点1"fetchSize="1000"tableName="t_user_info1"limitRow="0"maxVersions="100"cols="info:name;info:age;info:height;data:address"><Script name="sqlScript"><![CDATA[  SingleColumnValueFilter('info','age',>=,'binary:24') ]]></Script></Node><Connection id="CONNECT_01"  dbURL="127.0.0.1:9090" database="" username="" password=""  batchSize="1000" type="HBASE"/>

HBASE_WRITER

输入节点-写Hbase节点
通过thrift2服务接口访问。

属性说明
id唯一标示
type类型, HBASE_WRITER
batchSize批量提交记录数,根据服务端配置决定,建议1000
tableName表名称,格式遵循 命名空间:表名称 或 表名称
outputFields输入节点读数据时传递过来的字段名称
renameOutputFields输出节点到目标数据源的字段名称,格式遵循 列簇:列标示符 如:_rk_;info:name;info:age;info:height;data
rowKey代表行键,从 renameOutputFields 中匹配字段名称并获取,如果不填则系统默认生成 rk_uuid 格式

样本

<Node id="HBASE_WRITER_01" dbConnection="CONNECT_02" type="HBASE_WRITER" desc="写hbase节点1" batchSize="1000"  tableName="hw_ns:t_etl_logs" outputFields="uuid;username;fileurl;taskdesc;writetime"     renameOutputFields="uuid;username;fileurl;taskdesc;writetime"     rowKey="uuid"></Node><Connection id="CONNECT_02"  dbURL="127.0.0.1:9090" database="" username="" password=""  batchSize="" type="HBASE"/>

元数据Metadata

元数据文件定义目标数据格式(如输出节点中定义的renameOutputFields或renameOutputTags所对应的字段名称及字段类型)outputFields是输入节点中数据结果集中的字段名称, 将outputFields定义的字段转换成renameOutputFields定义的字段,其renameOutputFields转换格式通过元数据文件来定义。

属性说明适合
id唯一标示
field
name输出数据源的字段名称renameOutputFields,
renameOutputTags
type输出数据源的字段类型string,int,int32,float,
str_timestamp,decimal,
datetime,timestamp,blob
default默认值当nullable为false时,如果输出值为空字符串,则可以通过default来指定输出的默认值
nullable是否允许为空false不允许为空,必须和default配合使用。true允许为空。
errDefault如果输入数据向输出数据类型转换失败时,是否启动默认值如果设置值,则转换出错时也能向下执行,即出错的值使用该默认值,
如果不设置该值,则转换出错时不能向下执行。
dataFormat对日期输出格式的配置string -> datetime 或 datetime -> string 需要配置日期格式
日期格式配置如: YYYY-MM-DD hh:mm:ss 或YYYY-MM-DD hh:mm:ssZ+8h
dataLen对小数位格式的配置string -> decimal 格式设置输出数字小数点位数,代表保留小数点后几位,或用于日期输出时定义格式长度

数据源Connection

属性说明适合
id唯一标示
type数据源类型INFLUXDB_V1、MYSQL(兼容Doirs和OceanBase)、CLICKHOUSE、SQLSERVER、SQLITE、POSTGRES(兼容Greenplum)、ORACLE、ELASTIC、HIVE、HBSE
dbURL连接地址ck,mysql,influx,postgre,oracle,elastic,hive,sqlserver
database数据库名称ck,mysql,influx,sqlite,postgre,oracle,elastic,hive,sqlserver
username用户名称ck,mysql,influx,postgre,oracle,elastic,hive,sqlserver
password密码ck,mysql,influx,postgre,oracle,elastic,hive,sqlserver
tokentoken名称influx 2x
org机构名称influx 2x
rp数据保留策略名称influx 1x
  • 常用数据源连接
<Connection id="CONNECT_01" dbURL="http://127.0.0.1:58086" database="db1" username="user1" password="******" token="" org="hw"  type="INFLUXDB_V1"/><Connection id="CONNECT_02" dbURL="127.0.0.1:19000" database="db1" username="user1" password="******" batchSize="1000" type="CLICKHOUSE"/><Connection id="CONNECT_03" dbURL="127.0.0.1:3306" database="db1" username="user1" password="******"  batchSize="1000" type="MYSQL"/><Connection id="CONNECT_04"  database="d:/sqlite_db1.db"  batchSize="10000" type="SQLITE"/><Connection id="CONNECT_05" dbURL="127.0.0.1:10000" database="db1" username="root" password="b" batchSize="1000" type="HIVE"/><Connection id="CONNECT_06" dbURL="127.0.0.1:5432" database="db_1" username="u1" password="******" batchSize="1000" type="POSTGRES" /><Connection id="CONNECT_07" dbURL="http://127.0.0.1:9200" database="db1" username="elastic" password="******" batchSize="1000" type="ELASTIC"/><Connection id="CONNECT_08" dbURL="127.0.0.1:1521" database="orcl" username="c##u1" password="******"  batchSize="1000" type="ORACLE" /><Connection id="CONNECT_09"  dbURL="127.0.0.1:9090" database="" username="" password=""  batchSize="1000" type="HBASE"/><Connection id="CONNECT_10"  dbURL="127.0.0.1:1433" database="master" username="sa" password="******"  batchSize="1000" type="SQLSERVER"/>

Graph

属性说明适合
runMode1串行模式;2并行模式默认推荐使用并行模式,
如果需要各流程排序执行,可使用串行模式

连接线Line

属性说明适合
id唯一标示
from输入节点唯一标示
to输出节点唯一标示
typeSTANDARD 标准,一进一出,COPY 复制数据流,中间环节复制数据
order串行排序号,按正整数升序排列,在graph属性runMode为1时,
通过配置0,1,2这种方式实现串行执行
metadata目标元数据ID

支持配置全局变量

通过命令行方式传递全局变量

etl_engine -fileUrl ./global6.xml -logLevel debug arg1="d:/test3.xlsx" arg2=上海

其中arg1arg2是从命令行传递进来的全局变量

  • 配置文件中引用全局变量

<Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="节点1" fetchSize="500"><Script name="sqlScript"><![CDATA[select* from (select* from t5 where tag_1='${arg2}' limit 1000)    ]]></Script><Node id="XLS_WRITER_01"   type="XLS_WRITER" desc="输出节点2" appendRow="true"  fileURL="${arg1}"  startRow="3" metadataRow="2" sheetName="人员信息" outputFields="c1;c3;tag_1"  renameOutputFields="指标=B;年度=C;地区=D">

配置文件中${arg1} 会在服务运行时通过命令行参数arg1的值d:/test3.xlsx被替换掉
配置文件中${arg2} 会在服务运行时通过命令行参数arg2的值上海 被替换掉

  • 内置变量说明

为方便生成固定格式化内容,系统内置了常用变量,方便用于配置全局变量时动态替换变量值。内置变量前缀_HW_

  1. 时间变量
    格式:_HW_YYYY-MM-DD hh:mm:ss.SSS
    输出当前系统时间,如: 2022-01-02 19:33:06.108
    注意空格通过0x32进行转义,因此正确传递方式是
    _HW_YYYY-MM-DD0x32hh:mm:ss.SSS
    YYYY 输出四位年 2022MM 输出两位月 01DD 输出两位日 02hh 输出两位小时 19mm 输出两位分钟 33ss 输出两位秒 06.SSS 输出一个前缀.和三位毫秒 .108

以上部分可随意组合,如:_HW_YYYYMM ,输出202201

  1. 时间位移变量
    在原有时间变量基础上,大写Z字符代表对时、分钟、秒的加减位移。


    如格式:_HW_YYYY-MM-DD hh:mm:ss.SSSZ2h45m
    输出当前时间累加2小时45分钟后的时间。
    如格式:_HW_YYYY-MM-DD hh:mm:ssZ-24h10m
    大写字符Z后面跟随负数可实现减位移。
    输出当前时间减少24小时10分钟后的时间。
    支持的时间频率单位如下:
    "ns", "us" (or "µs"), "ms", "s", "m", "h"


    在原有时间变量基础上,小写z字符代表对年、月、日的加减位移。


    如格式:_HW_YYYY-MM-DD hh:mm:ssz1,2,3
    输出当前时间累加1年2个月3天
    如格式:_HW_YYYY-MM-DD hhz-1,-2,-3
    输出当前时间减少1年2个月3天

  2. 时间戳变量
    格式:_HW_timestamp10
    输出当前系统10位时间戳,如:1669450716
    格式:_HW_timestamp13
    输出当前系统13位时间戳,如:1669450716142
    格式:_HW_timestamp16
    输出当前系统16位时间戳,如:1669450716142516
    格式:_HW_timestamp19
    输出当前系统19位时间戳,如:1669450716142516700

  3. UUID变量
    格式:_HW_UUID32
    输出32位UUID,如:D54C3C7163844E4DB4F073E8EEC83328
    格式:_HW_uuid32
    输出32位UUID,如:d54c3c7163844e4dB4f073e8eec83328

  • 通过命令行方式传递内置变量

etl_engine -fileUrl ./global6.xml -logLevel debug arg1=_HW_YYYY-MM-DD0x32hh:mm:ss.SSS arg2=_HW_YYYY-MM-DD
  • 配置文件中引用内置变量

<Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="节点1" fetchSize="500"><Script name="sqlScript"><![CDATA[select* from (select* from t5 where tag_1='${arg1}' limit 1000)    ]]></Script><Node id="XLS_WRITER_01"   type="XLS_WRITER" desc="输出节点2" appendRow="true"  fileURL="${arg2}.xlsx" _fileURL="d:/demo/test2.xlsx" startRow="3" metadataRow="2" sheetName="人员信息" outputFields="c1;c3;tag_1"  renameOutputFields="指标=B;年度=C;地区=D">

支持解析嵌入go语言

可以在任意一个输出节点的<BeforeOut></BeforeOut> 标签内嵌入自己的业务逻辑,更多介绍

  • 增加字段

可以增加多个字段,并赋予默认值

package extimport ("errors""fmt""strconv""github.com/tidwall/gjson""github.com/tidwall/sjson")func RunScript(dataValue string) (result string, topErr error) {newRows :=""rows := gjson.Get(dataValue,"rows")for index, row := rangerows.Array() {  //tmpStr, _ :=sjson.Set(row.String(),"addCol1",time.Now().Format("2006-01-02 15:04:05.000"))tmpStr, _ :=sjson.Set(row.String(),"addCol1","1")tmpStr, _ = sjson.Set(tmpStr,"addCol2","${arg2}")newRows, _ = sjson.SetRaw(newRows,"rows.-1", tmpStr)}return newRows, nil}
  • 合并字段

可以将多个字段合并为一个字段

package extimport ("errors""fmt""strconv""github.com/tidwall/gjson""github.com/tidwall/sjson")func RunScript(dataValue string) (result string, topErr error) {newRows :=""rows := gjson.Get(dataValue,"rows")for index, row := rangerows.Array() {area :=gjson.Get(row.String(),"tag_1").String()year :=gjson.Get(row.String(),"c3").String()tmpStr, _ :=sjson.Set(row.String(),"tag_1", area +"_" + year)newRows, _ = sjson.SetRaw(newRows,"rows.-1", tmpStr)}return newRows, nil}
  • 完整样本

<?xml version="1.0" encoding="UTF-8"?><Graph>   <Node   type="CSV_READER" desc="输入节点1" fetchSize="500"  fileURL="d:/demo.csv" startRow="1" fields="field1;field2;field3;field4"  fieldsIndex="0;1;2;3"  >  </Node>     <Node   type="OUTPUT_TRASH" desc="节点2"  >        <BeforeOut>            <![CDATA[package extimport ("errors""fmt""strconv""strings""time""github.com/tidwall/gjson""github.com/tidwall/sjson""etl-engine/etl/tool/extlibs/common""io/ioutil""os")func RunScript(dataValue string) (result string, topErr error) {defer func() {if topLevelErr := recover(); topLevelErr != nil {topErr = errors.New("RunScript 捕获致命错误" + topLevelErr.(error).Error())} else {topErr = nil}}()newRows := ""GenLine(dataValue,"db1","autogen","t13","field2","field3;field4")return newRows, nil}//接收的是JSONfunc GenLine(dataValue string, db string, rp string, measurement string, fields string, tags string) error {head := "# DML\n# CONTEXT-DATABASE: " + db + "\n# CONTEXT-RETENTION-POLICY: " + rp + "\n\n"line := ""    fieldLine := ""    tagLine := ""_t_ := strings.Split(tags, ";")_f_ := strings.Split(fields, ";")rows := gjson.Get(dataValue, "rows")for _, row := range rows.Array() {        fieldLine = ""        tagLine = ""for i1 := 0; i1 < len(_t_); i1++ {tagValue := gjson.Get(row.String(), _t_[i1])tagLine = tagLine + _t_[i1] + "=\"" + tagValue.String() + "\","}tagLine = tagLine[0 : len(tagLine)-1]for i1 := 0; i1 < len(_f_); i1++ {fieldValue := gjson.Get(row.String(), _f_[i1])fieldLine = fieldLine + _f_[i1] + "=" + fieldValue.String() + ","}fieldLine = fieldLine[0 : len(fieldLine)-1]if len(tagLine) > 0 && len(fieldLine) > 0 {    line = line + measurement + "," + tagLine + " " + fieldLine + " " + strconv.FormatInt(time.Now().Add(500*time.Millisecond).UnixNano(), 10) + "\n"        } else {                        if len(fieldLine) > 0 {                line = line + measurement + "," + fieldLine + " " + strconv.FormatInt(time.Now().Add(500*time.Millisecond).UnixNano(), 10) + "\n"            }        }}if len(line) > 0 {txt := head + linefileName := "d:/"+strconv.FormatInt(time.Now().UnixNano(), 10)WriteFileToDB(fileName, txt)err1 := os.Remove(fileName)if err1 != nil {fmt.Println("删除临时文件失败:", fileName)return err1}}return nil}func WriteFileToDB(fileName string, txt string) {buf := []byte(txt)err := ioutil.WriteFile(fileName, buf, 0666)if err != nil {fmt.Println("写入文件失败:", err)return} else {cmdLine := "D:/software/influxdb-1.8.10-1/influx.exe  -import -path=" + fileName + " -host 127.0.0.1 -port 58086 -username u1 -password 123456 -precision=ns"//fmt.Println("cmdLine:",cmdLine)common.Command3("GB18030", "cmd", "/c", cmdLine)}}              ]]>        </BeforeOut>    </Node>      <Line type="STANDARD" from="CSV_READER_01" to="OUTPUT_TRASH_01" order="0" metadata="METADATA_03">线标注</Line>    <Metadata>        <Field name="field1" type="string" default="-1" nullable="false"/>        <Field name="field2" type="string" default="-1" nullable="false"/>        <Field name="field3" type="string" default="-1" nullable="false"/>        <Field name="field4" type="string" default="-1" nullable="false"/>    </Metadata>   </Graph>

技术交流群

微信技术交流群

合作模式

欢迎对接合作

etl-engine 全行业可接...

``` @auth Mr Huang vx:weigeonlyyou```

About

etl engine 轻量级 跨平台 流批一体ETL引擎 数据抽取-转换-装载 ETL engine lightweight cross platform batch flow integration ETL engine data extraction transformation loading

Topics

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages


[8]ページ先頭

©2009-2025 Movatter.jp