Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

etl engine readme for english

hw2499 edited this pageApr 12, 2023 ·13 revisions

etl-engine

CN docEN doc

Batch stream integrated data exchange engine

Realize reading data from the source ->(target data type conversion | data distribution) ->writing to the target data source

Support fusion computing queries during data stream transmission

Product overview

  • Products are composed of etl-engine engine, etl-designer cloud designer and etl-crontab scheduling.
  • Etl-engine parses etl configuration files and performs etl tasks.
  • The ETl-Designer cloud designer drag-and-drop generates etl task configuration files that are recognized by the ETl-Engine engine.
  • The ETl-crontab scheduling designer is responsible for executing the specified etl task according to the time cycle. The ETL-crontab scheduling also provides the function of querying etl task execution logs.
  • Three parts form an etl solution that can be integrated into any usage scenario.

Product details

High availability introduction

Resource address

  • etl-engine Download address

The last compile time of the current version is 20230310

Download address

  • etl-designer The address of the designer video player

The etl-designer supports OEM publishing(currently integrated intoetl_crontab

Video play address

  • etl-crontab Scheduling designer video play address

Video play address

etl_crontab Instructions

  • Sample etl-engine configuration file

Example of using etl-engine

Functional characteristics

  • Supports cross-platform execution (windows,linux), requires only one executable and one configuration file to run, no other dependencies, lightweight engine.
  • The input/output data source supports influxdb v1, clickhouse, prometheus, elasticsearch, hadoop(hive), postgresql, mysql, oracle, sqlite, rocketmq, kafka, redis, and excel
  • Any input node can be combined with any output node, following the pipeline model.
  • Support data fusion queries across multiple types of databases.
  • Support data fusion calculation queries with multiple types of databases during message flow data transmission.
  • The data fusion query syntax follows the ANSI SQL standard.
  • To meet service requirements, you can use global variables in configuration files to dynamically update configuration files.
  • Any output node can be embedded in the go language script and parsed to achieve the format conversion function of the output data stream.
  • Supports secondary development at the node level. By configuring custom nodes and configuring go language scripts on custom nodes, various functions can be extended and implemented.
  • Any input node can be copied by combining data flows to achieve a scenario where one input is branched to multiple outputs simultaneously.
  • The execution logs of each node can be output to the database.
  • Combined with crontab scheduling, etl-engine tasks are executed periodically.

Data flow characteristic

  • Any combination of input and output

输入输出

  • Parsing embedded scripting language facilitates format conversion

支持嵌入脚本语言

  • Data stream replication facilitates multiplexing

数据流复制

  • Custom nodes facilitate various operations

自定义节点

  • Transition nodes facilitate various transformations

转换节点

  • Streaming batch integrated query
    Supports multi-source input, memory computing, and fused output
    流批一体融合查询

Scheduling integration scheme

  • Flexible combination of etl_crontab and etl_engine

集成方案

  • etl-designer

etl-designer设计器

  • Scheduling designer

调度设计器

  • Scheduling log

调度日志

  • Etl log details

Etl日志明细

Mode of use

window platform

  etl_engine.exe -fileUrl .\graph.xml -logLevel info

linux platform

    etl_engine -fileUrl .\graph.xml -logLevel info

Example configuration file

<?xml version="1.0" encoding="UTF-8"?><Graph><Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="节点1" fetchSize="5"><Script name="sqlScript"><![CDATA[select* from (select* from t3 limit 10)]]></Script></Node><Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="node 2" dbConnection="CONNECT_02" outputFields="f1;f2" renameOutputFields="c1;c2" outputTags="tag1;tag4"  renameOutputTags="tag_1;tag_4"  measurement="t1" rp="autogen"></Node><!--<Node id="DB_OUTPUT_02" type="DB_OUTPUT_TABLE" desc="node 3" dbConnection="CONNECT_03" outputFields="f1;f2;f3"  renameOutputFields="c1;c2;c3"  batchSize="1000"><Script name="sqlScript"><![CDATA[           insert into db1.t1 (c1,c2,c3) values (?,?,?)    ]]></Script></Node>  --><Line id="LINE_01" type="STANDARD" from="DB_INPUT_01" to="DB_OUTPUT_01" order="0" metadata="METADATA_01"></Line><Metadata id="METADATA_01"><Field name="c1" type="string" default="-1" nullable="false"/><Field name="c2" type="int" default="-1" nullable="false"/><Field name="tag_1" type="string" default="-1" nullable="false"/><Field name="tag_4" type="string" default="-1" nullable="false"/></Metadata><Connection id="CONNECT_01" dbURL="http://127.0.0.1:58080" database="db1" username="user1" password="******" token="" org="hw"  type="INFLUXDB_V1"/><Connection id="CONNECT_02" dbURL="http://127.0.0.1:58086" database="db1" username="user1" password="******" token="" org="hw"  type="INFLUXDB_V1"/><!--<Connection id="CONNECT_04" dbURL="127.0.0.1:19000" database="db1" username="user1" password="******" type="CLICKHOUSE"/>--><!--<Connection id="CONNECT_03" dbURL="127.0.0.1:3306" database="db1" username="user1" password="******" type="MYSQL"/>--><!--<Connection id="CONNECT_03"  database="d:/sqlite_db1.db"  batchSize="10000" type="SQLITE"/>--></Graph>

Support node type

Any read node can output to any write node

Input node - Read the data table

Output node - Write data table

Input node - Read excel file

Output Node - Write excel files

Input node - Execute database script

Output node - Trash can, no output

Input node -MQ consumer

Output node -MQ producer

The data stream copy node, located between the input node and the output node, is both output and input

Input node - Read redis

Output node - Write redis

Custom nodes, by embedding the go script to achieve various operations

Input Node - Execute system script node

Input node - Read CSV file node

Input Node - Read PROMETHEUS node

Input node -PROMETHEUS EXPORTER node

Output Node - Write PROMETHEUS node

Input node -Http node

Input node - Read elastic node

Output node - Write elastic node

Input Node - Read hive node

Combination mode

  • Any input node can be connected to any output node
  • Any input node can connect to a copy node
  • A copy node can connect to multiple output nodes
  • Any input node can be connected to a transition node
  • The copy node cannot connect to the transition node

Configuration description

DB_INPUT_TABLE

Input node

attributedescription
idUnique mark
typeThe type is , DB_INPUT_TABLE
scriptsqlScript SQL statement
fetchSizeNumber of records read per session
dbConnectionData source ID
descdescription

Support source type

MYSQL、Influxdb 1x、CK、PostgreSQL、Oracle、sqlite

sample

<Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="节点1" fetchSize="1000"><Script name="sqlScript"><![CDATA[select* from (select* from t4 limit 100000)]]></Script></Node>

XLS_READER

Input node

Read the EXCEL file

attributedescription
idUnique mark
typeThe type is , XLS_READER
fileURLFile path + file name
startRowRead from row 1 with index 0 (usually column header)
sheetNameTable name
maxRowThe maximum number of lines read is *, which means all lines read, and 10, which means 10 lines read
fieldMapField mapping in the format of field1=A; field2=B; field3=C
Field name = Number of columns Multiple fields are separated by semicolons

样本

<Node id="XLS_READER_01"   type="XLS_READER" desc="Input node 1"  fileURL="d:/demo/test1.xlsx" startRow="2" sheetName="Personnel information" fieldMap="field1=A;field2=B;field3=C"></Node>

DB_OUTPUT_TABLE

Output node

attributedescriptionSuitable for
idUnique mark
typeThe type is, DB_OUTPUT_TABLE
scriptinsert、delete、update SQL statementsck,mysql,sqlite,postgre,oracle
batchSizeThe number of records committed per batchck,mysql,sqlite,postgre,oracle
Note that influx entered with the fetchSize as the batch submitted size
outputFieldsEnter the field name passed by the node when reading datainflux,ck,mysql,sqlite,postgre,oracle
renameOutputFieldsThe field name of the output node to the target data sourceinflux,ck,mysql,sqlite,postgre,oracle
dbConnectionData source ID
descdescription
outputTagsEnter the label name passed by the node when reading datainflux
renameOutputTagsThe label name of the output node to the target data sourceinflux
rpReserve policy nameinflux
measurementTable nameinflux
timeOffsetThe time jitter offset used to generate a non-repeatable timestamp when writing in bulk
(This feature is implemented through time.Sleep, which suggests adding a nanosecond time column by embedding the script, or adjusting your time+tags.)
influx

Support target type

MYSQL、Influxdb 1x、CK、PostgreSQL、Oracle、sqlite

sample

<Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="write influx node1" dbConnection="CONNECT_02" outputFields="f1;f2;f3;f4"  renameOutputFields="c1;c2;c3;c4"  outputTags="tag1;tag2;tag3;tag4"  renameOutputTags="tag_1;tag_2;tag_3;tag_4" measurement="t5" rp="autogen"></Node><Node id="DB_OUTPUT_02" type="DB_OUTPUT_TABLE" desc="write mysql node2" dbConnection="CONNECT_03" outputFields="time;f1;f2;f3;f4;tag1;tag2;tag3;tag4"  renameOutputFields="time;c1;c2;c3;c4;tag_1;tag_2;tag_3;tag_4" batchSize="1000"><Script name="sqlScript"><![CDATA[          insert into db1.t1 (time,c1,c2,c3,c4,tag_1,tag_2,tag_3,tag_4) values (?,?,?,?,?,?,?,?,?)    ]]></Script></Node>

XLS_WRITER

Output node

Write the contents of the EXCEL file

attributedescription
idUnique mark
typeXLS_WRITER
fileURLFile path + file name
startRowFor example, the number 2 indicates that data is written from the second line
sheetNameTable name
outputFieldsEnter the name of the field passed by the node,
Format: field1; field2; field3
renameOutputFieldsField mapping, format: indicator =B; Year =C; Region =D
Field name = Number of columns Multiple fields are separated by semicolons
metadataRowThe number of lines in the EXCEL file where the field name is displayed. For example, the number 1 indicates the field name that is started in line 1
appendRowtrue indicates the append record mode and false indicates the overwrite mode.

sample

<Node id="XLS_WRITER_01"   type="XLS_WRITER" desc="Output node 2" appendRow="true"  fileURL="d:/demo/test2.xlsx" startRow="3" metadataRow="2" sheetName="Personnel information" outputFields="c1;c3;tag_1"  renameOutputFields="Index=B;Year=C;Region=D"></Node>

DB_EXECUTE_TABLE

Input node

Execute insert ,delete ,update statements

attributedescriptionSuitable for
idUnique mark
typeDB_EXECUTE_TABLE
roolbackRollback or notfalse is not rolled back. true is rolled back
sqlScriptdelete、updatestatements are separated by semicolonsmysql,sqlite,postgre,oracle,ck(delete,update not supported)
fileURLExternal filefileURL has a higher priority than sqlScript, and only one of the two can be used

sample

<Node id="DB_EXECUTE_01" dbConnection="CONNECT_01" type="DB_EXECUTE_TABLE" desc="node 1" rollback="false"><Script name="sqlScript"><![CDATA[         insert into t_1 (uuid,name) values (13,'aaa');         insert into t_1 (uuid,name) values (14,'bbb');         insert into t_1 (uuid,name) values (15,'ccc');         insert into t_1 (uuid,name) values (1,'aaa')]]></Script>

OUTPUT_TRASH

Output node

Empty pipe with no output, suitable for the target node connected as a node without any output (for example, the DB_EXECUTE_TABLE node)

sample

<Node id="OUTPUT_TRASH_01"   type="OUTPUT_TRASH" desc="node 2"></Node>

MQ_CONSUMER

Input node, block mode

MQ_CONSUMER(rocketmq support)

attributedescriptionSuitable for
idUnique mark
typeMQ_CONSUMER
flagDefault value:ROCKETMQrocketmq is supported
nameServerThe address of the mq server is in the format of 127.0.0.1:8080
groupmq group name
topicSubscribed subject name
tagLabel name, format: * represents all labels consumed,
tag_1 means that only the tag_1 tag is consumed

sample

<Node id="MQ_CONSUMER_02" type="MQ_CONSUMER" flag="ROCKETMQ" nameServer="127.0.0.1:8080" group="group_1" topic="out_event_user_info" tag="*"></Node>

MQ_CONSUMER(kafka support)

attributedescriptionSuitable for
idUnique mark
typeMQ_CONSUMER
flagDefault value:KAFKAkafka is supported
nameServerThe address of the mq server is in the format of 127.0.0.1:8080
groupmq group name
topicSubscribed subject name
listenerFlag1 is to listen by partition. 2 is to monitor by a single channel,topic can be multiple

sample

<Node id="MQ_CONSUMER_03" type="MQ_CONSUMER" flag="KAFKA" nameServer="127.0.0.1:18081" group="group_10" topic="out_event_user_info" listenerFlag="2"></Node>

MQ_PRODUCER

Output node

MQ_PRODUCER(rocketmq support)

attributedescriptionSuitable for
idUnique mark
typeMQ_PRODUCER
flagDefault value:ROCKETMQrocketmq is supported
nameServerThe address of the mq server is in the format of 127.0.0.1:8080
groupmq group name
topicSubscribed subject name
tagLabel name. The format is tag_1
sendFlagSending mode,1 is synchronization; 2 is asynchronous; Three is one way
outputFieldsEnter the name of the field passed by the node,
Format: field1; field2; field3 Multiple fields are separated by semicolons
renameOutputFieldsField mapping format: field1; field2; field3 Multiple fields are separated by semicolons

sample

<Node id="MQ_PRODUCER_01" type="MQ_PRODUCER" flag="ROCKETMQ" nameServer="127.0.0.1:8080" group="group_11" topic="out_event_system_user" tag="tag_1"          sendFlag="3" outputFields="time;tag_1;c2"  renameOutputFields="Time;Equipment;index"></Node>

MQ_PRODUCER (kafka support)

attributedescriptionSuitable for
idUnique mark
typeMQ_PRODUCER
flagDefault value:KAFKAkafka is supported
nameServerThe address of the mq server is in the format of 127.0.0.1:8080
topicSubscribed subject name
isPartitiontrue sends messages to the specified partition. false: send messages to random partitions
sendFlagSending mode,1 is synchronization; 2 is asynchronous
outputFieldsEnter the name of the field passed by the node,
Format: field1; field2; field3 Multiple fields are separated by semicolons
renameOutputFieldsField mapping format: field1; field2; field3 Multiple fields are separated by semicolons

sample

<Node id="MQ_PRODUCER_02" type="MQ_PRODUCER" flag="KAFKA" nameServer="127.0.0.1:18081"  topic="out_event_system_user"          sendFlag="1" outputFields="Offset;Partition;Topic;Value"  renameOutputFields="Offset;Partition;Topic;Value"></Node>

COPY_STREAM

Output a data stream from one input node to multiple branch output nodes

attributedescriptionSuitable for
idUnique mark
typeCOPY_STREAM

sample

<Node id="COPY_STREAM_01" type="COPY_STREAM" desc="Data flow copy node" ></Node><Line id="LINE_01" type="STANDARD" from="DB_INPUT_01" to="COPY_STREAM_01" order="1" metadata="METADATA_01" ></Line><Line id="LINE_02" type="COPY" from="COPY_STREAM_01:0" to="DB_OUTPUT_01" order="2" metadata="METADATA_01"></Line><Line id="LINE_03" type="COPY" from="COPY_STREAM_01:1" to="DB_OUTPUT_02" order="2" metadata="METADATA_02"></Line>

REDIS_READER

Input node

attributedescriptionSuitable for
idUnique mark
typeREDIS_READER
nameServer127.0.0.1:6379
password******
db0Database ID
isGetTTLtrue or false Whether to read the ttl information
keysRead keys separated by semicolonsCurrently, only string,int, and float content can be read

sample

<Node id="REDIS_READER_01"   type="REDIS_READER" desc="Input node 1"   nameServer="127.0.0.1:6379" password="******" db="0" isGetTTL="true" keys="a1;a_1" ></Node>

REDIS_WRITER

Output node: Because the key name cannot be repeated, only the last line of the read node is suitable for writing

attributedescriptionSuitable for
idUnique mark
typeREDIS_WRITER
nameServer127.0.0.1:6379
password******
db0Database ID
isGetTTLtrue or false Whether to read the ttl information
outputFieldsCurrently, only string,int, and float contents are supported
renameOutputFieldsCurrently, only string,int, and float contents are supported

sample

<Node id="REDIS_WRITER_01"   type="REDIS_WRITER" desc="Output node 1"  nameServer="127.0.0.1:6379" password="******" db="1"   isGetTTL="true" outputFields="a1;a_1"  renameOutputFields="f1;f2"  ></Node>

CUSTOM_READER_WRITER

Custom nodes, which can be implemented by embedding GO scripts

attributedescriptionSuitable for
idUnique mark
typeCUSTOM_READER_WRITER

EXECUTE_SHELL

Input Node - Execute system script node

attributedescriptionSuitable for
idUnique mark
typeEXECUTE_SHELL
fileURLExternal script file locationOnly one fileURL and Script can appear. When fileURL appears at the same time, fileurl takes precedence over Script
ScriptScript content
outLogFileURLThe console outputs content to the specified log file

sample

<Node id="EXECUTE_SHELL_01"  type="EXECUTE_SHELL" desc="node 1"  _fileURL="d:/test1.bat" outLogFileURL="d:/test1_log.txt"><Script><![CDATA[    c:    dir/w]]></Script></Node>

CSV_READER

Input node - Read CSV file node

attributedescriptionSuitable for
idUnique mark
typeCSV_READER
fileURLCSV file location
fetchSizeNumber of batches read from memory at a timeFor example, influxdb can match the number of records submitted in each batch. 123 fields of 1W pieces of data have been tested and 100 is configured. The storage time is 15 seconds
startRowThe row from which the data is read, with 0 representing row 1 by defaultUsually 0 is the first column name
fieldsDefines the output field name, separated by a semicolonfield1;field2;field3
fieldsIndexDefine the columns of the output. By default, 0 represents column 1. Multiple fields are separated by semicolons. If the value is set to -1, all fields are read in sequence"2;3;4"

sample

<Node id="CSV_READER_01"   type="CSV_READER" desc="Input node 1" fetchSize="5"  fileURL="d:/demo2.csv" startRow="1" fields="field1;field2;field3"  fieldsIndex="0;3;4"></Node>

PROMETHEUS_API_READER

Input Node - Read PROMETHEUS node

attributedescriptionSuitable for
idUnique mark
typePROMETHEUS_API_READER
urlprometheus server addressFor example:http://127.0.0.1:9090
ScriptTo query API content, only /api/v1/query is supported
And /api/v1/query_range
For example: /api/v1/query?query=my_device_info{deviceCode="DeviceNumber000"}[1d]

** Note: in the result set returned by the query,name is the measure name;TIME is the timestamp when prometheus was stored;VALUE is the value of prometheus **

<Node id="PROMETHEUS_API_READER_1" type="PROMETHEUS_API_READER"  url="http://127.0.0.1:9090"><Script name="sqlScript"><![CDATA[            /api/v1/query?query=my_device_info{deviceCode="DeviceNumber000"}[1d]            ]]></Script></Node><Node id="DB_OUTPUT_TABLE_1" type="DB_OUTPUT_TABLE" batchSize="10" dbConnection="CONNECT_1" desc="" outputFields="__name__;address;deviceCode;__TIME__;__VALUE__" renameOutputFields="f1;f2;f3;f4;f5"><Script name="sqlScript"><![CDATA[insert into t_prome_info_bk(f1,f2,f3,f4,f5)values (?,?,?,?,?)]]></Script>

PROMETHEUS_EXPORTER

Input node-PROMETHEUS EXPORTER

attributedescriptionSuitable for
idUnique mark
typePROMETHEUS_EXPORTER
exporterAddrThe address of the exporter, IP:PORTFor example: :10000
exporterMetricsPathexporter's path,For example: /EtlEngineExport
metricNameMetric nameFor example:Etl_Engine_Exporter
metricHelpMetric descriptionsample
labelsLabel nameFor example:deviceCode;address;desc
<Node id="PROMETHEUS_EXPORTER_1" type="PROMETHEUS_EXPORTER"      exporterAddr=":10000" exporterMetricsPath="/EtlEngineExport"     metricName="Etl_Engine_Exporter" metricHelp="Etl_Engine_Exporter samples"     labels="deviceCode;address;desc"></Node>

Add the following contents to the prometheus configuration file:

  - job_name:"etlengine_exporter"    metrics_path:"/EtlEngineExport"     static_configs:      - targets: ["127.0.0.1:10000"]

A service address /pushDataService is also exposed for generating data. postman debugging details are as follows:

  POST Method URL  http://127.0.0.1:10000/pushDataService ,  Body x-www-form-urlencoded Parameters:"jsondata":{"labels":{"deviceCode":"DeviceCode001","address":"District_1","desc":"Maximum value"},"value":100}

Two fields are automatically added to the output data stream; name is the name of the measure, VALUE is the value of prometheus,

PROMETHEUS_API_WRITER

Output Node - Write PROMETHEUS node

attributedescriptionSuitable for
idUnique mark
typePROMETHEUS_API_WRITER
urlprometheus server addressFor example:http://127.0.0.1:9090
metricMetric name
outputFieldsInput the name of the field passed by the node
renameOutputFieldsName of the label corresponding to when prometheus was imported; data items correspond to the outputFields
valueFieldvalue of prometheus when it was imported into the library; the data entry corresponds to an existing field name in the input node
<Node id="DB_INPUT_TABLE_1" type="DB_INPUT_TABLE" fetchSize="1000" dbConnection="CONNECT_1"><Script name="sqlScript"><![CDATA[select f2,f3,f4 from t_prome_info ]]></Script></Node><Node id="PROMETHEUS_API_WRITER_1" type="PROMETHEUS_API_WRITER" url="http://127.0.0.1:9090" metric="my_device_info" outputFields="f2;f3" renameOutputFields="deviceCode;address" valueField="f4"></Node>

HTTP_INPUT_SERVICE

Input node -Http node, blocking mode

attributedescriptionSuitable for
idUnique mark
typeHTTP_INPUT_SERVICE
serviceIpBind the IP address of the HTTP/HTTPS service
servicePortPort bound to the HTTP/HTTPS service
serviceNameName of the exposed serviceDefault:etlEngineService
serviceCertFileLocation of HTTPS service certificate file
serviceKeyFileLocation of the HTTPS service key file
<Node id=""type="HTTP_INPUT_SERVICE"serviceIp=""servicePort="8081"serviceName="etlEngineService"serviceCertFile=""serviceKeyFile=""></Node>postman debugging:         http://127.0.0.1:8081/etlEngineService POST Method,URL: /etlEngineService , Body:x-www-form-urlencoded Parameters:"jsondata":{"rows":[{"deviceCode":"DeviceCode001","address":"Chaoyang District","desc":"Maximum value","value":20},{"deviceCode":"DeviceCode002","address":"Chaoyang District","desc":"Maximum value","value":18}]}     Note: You must pass an array structure with rows as the KEY

ELASTIC_READER

Input node - Read elastic node

attributedescriptionSuitable for
idUnique mark
typeELASTIC_READER
indexIndex name
sourceFieldsThe name of the field output in the result set
fetchSizeNumber of records read per session
Script标签DSL query syntax

sample

<Node id="ELASTIC_READER_01" dbConnection="CONNECT_02"   type="ELASTIC_READER" desc="node 2"  sourceFields="custom_type;username;desc;address" fetchSize="2"><Script name="sqlScript"><![CDATA[            {"query": {"bool":{"must":[                              //{                                //"term": {"username.keyword":"Mr.Wang"  }                             //"match": {"username":""  }                             //  },                               {"term":   {"custom_type":"t_user_info"  }                               }                          ]                                   }                    }                }        ]]></Script></Node><Connection id="CONNECT_02" type="ELASTIC" dbURL="http://127.0.0.1:9200" database="es_db3" username="elastic" password="******" />

ELASTIC_WRITER

Output node - Write elastic node

attributedescriptionSuitable for
idUnique mark
typeELASTIC_WRITER
indexIndex name
idTypePrimary key output mode: 1 indicates that the id is not specified. The es system generates a 20-bit GUID by itself.
2 indicates the field name specified in idExpress, and the value matching the same field name is obtained from the renameOutputFields of the previous node.
3 indicates that the value is configured using the expression specified in idExpress. The _HW_UUID32 expression indicates that a primary key is automatically generated by a 32-bit UUID
idExpressFor example, idType is set to 3. This parameter is set to _HW_UUID32
outputFieldsInput node the field name passed by the node when reading dataOutput field contents in sequence, not by field name
renameOutputFieldsThe field name of the output node to the target data sourceOutput field contents in sequence, not by field name

sample

<Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="node 1" fetchSize="2"><Script name="sqlScript"><![CDATA[        SELECT"t_user_info" AS custom_type,uname, udesc,uaddress,uid FROM t_u_info]]></Script></Node><Node id="ELASTIC_WRITER_01" dbConnection="CONNECT_02"  type="ELASTIC_WRITER" desc="node 2"          outputFields="custom_type;uname;udesc;uaddress;uid"         renameOutputFields="custom_type;username;desc;address;uid"        idType="3"         idExpress="_HW_UUID32"></Node><Line id="LINE_01" type="STANDARD" from="DB_INPUT_01" to="ELASTIC_WRITER_01" order="0" metadata="METADATA_01"></Line><Metadata id="METADATA_01"><Field name="custom_type" type="string" default="-1" nullable="false"/><Field name="username" type="string" default="-1" nullable="false"/><Field name="desc" type="string" default="-1" nullable="false"/><Field name="address" type="string" default="-1" nullable="false"/><Field name="uid" type="string" default="-1" nullable="false"/></Metadata><Connection id="CONNECT_02" type="ELASTIC" dbURL="http://127.0.0.1:9200" database="es_db3" username="elastic" password="******" />

HIVE_READER

Input node -Read hive node

Authentication

hive.server2.authentication = NONE
hive.server2.authentication = KERBEROS

attributedescription
idUnique mark
typeHIVE_READER
scriptsqlScript SQL statements
fetchSizeNumber of records read per session
dbConnectionData source ID
authFlagAuthentication type: NONE or KERBEROS, default is NONE
krb5PrincipalKerberos user name, such as: hive. server2. authentication. kerberos. principal=hive/_HOST@EXAMPLE.COM Hive in
descdescription

sample

<Node id="HIVE_READER_01" dbConnection="CONNECT_01"   type="HIVE_READER" desc="node 1"  fetchSize="100"><Script name="sqlScript"><![CDATA[select* from db1.t_u_info  ]]></Script></Node><Connection id="CONNECT_01"           dbURL="127.0.0.1:10000" database="db1"           username="root" password="******"           batchSize="1000" type="HIVE"/>

Metadata

The metadata file defines the target data format (such as the name and type of the field corresponding to renameOutputFields or renameOutputTags defined in the output node). outputFields is the name of the field in the data result set in the input node. Converts a field defined by outputFields to a field defined by renameOutputFields. The renameOutputFields conversion format is defined in the metadata file.

attributedescriptionSuitable for
idUnique mark
field
nameThe field name of the output data sourcerenameOutputFields,
renameOutputTags
typeThe field type of the output data sourcestring,int,int32,float,
str_timestamp,decimal,
datetime,timestamp
defaultDefault valueWhen nullable is false, if the output value is an empty string, you can specify the default value of the output by default
nullableWhether to allow nullfalse cannot be empty. It must be used with default. true is allowed to be null.

Connection

attributedescriptionSuitable for
idUnique mark
typeData source typeINFLUXDB_V1、MYSQL、CLICKHOUSE、SQLITE、POSTGRES、ORACLE、ELASTIC
dbURLConnection addressck,mysql,influx,postgre,oracle,elastic
databaseDatabase nameck,mysql,influx,sqlite,postgre,oracle,elastic
usernameUser nameck,mysql,influx,postgre,oracle,elastic
passwordpasswordck,mysql,influx,postgre,oracle,elastic
tokentoken nameinflux 2x
orgOrganization nameinflux 2x
rpName of the data retention policyinflux 1x

Graph

attributedescriptionSuitable for
runMode1. Serial mode; 2 Parallel modeParallel mode is recommended by default,
If you want the processes to execute in order, you can use the serial mode

Line

attributedescriptionSuitable for
idUnique mark
fromInput node Unique mark
toOutput node Unique mark
typeSTANDARD one in, one out,COPY copies data streams and copies data in the intermediate link
orderSerial sort numbers, in ascending order of positive integers. When the graph attribute runMode is 1,
Configure 0,1, and 2 to implement serial execution
metadataID of the target metadata

You can configure global variables

Global variables are passed on the command line

etl_engine -fileUrl ./global6.xml -logLevel debug arg1="d:/test3.xlsx" arg2=上海

arg1 andarg2 are global variables passed in from the command line

  • Global variables are referenced in configuration files

<Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="node 1" fetchSize="500"><Script name="sqlScript"><![CDATA[select* from (select* from t5 where tag_1='${arg2}' limit 1000)    ]]></Script><Node id="XLS_WRITER_01"   type="XLS_WRITER" desc="output node 2" appendRow="true"  fileURL="${arg1}"  startRow="3" metadataRow="2" sheetName="Personnel information" outputFields="c1;c3;tag_1"  renameOutputFields="Index=B;Year=C;Region=D">

${arg1} in the configuration file will be replaced with the value of arg1d:/test3.xlsx at service runtime.
${arg2} in the configuration file will be replaced with the value of arg2ShangHai service runtime

  • Built-in variable specification

To facilitate the generation of fixed formatted content, the system has built-in common variables, which can be used to dynamically replace variable values when configuring global variables.
Built-in variable prefix_HW_

  1. Time variable
    Format:_HW_YYYY-MM-DD hh:mm:ss.SSS
    The current system time is displayed, for example, 2022-01-02 19:33:06.108
    Note that Spaces are escaped by 0x32, so the correct way to pass them is
    _HW_YYYY-MM-DD0x32hh:mm:ss.SSS
    YYYY  Output four-bit Year  2022MM output two-bit month 01DD output two-bit day  02hh output two-bit hour 19mm  output two-bit minute  33ss output two-bit second  06.SSS  output a prefix. And three milliseconds .108

The above parts can be combined at will, for example,_HW_YYYYMM, output202201

  1. Time displacement variable
    On the basis of the original time variable, the capital Z character represents the addition or subtraction of time, minutes and seconds.


    For example, the format is:_HW_YYYY-MM-DD hh:mm:ss.SSSZ2h45m
    Indicates the time after 2 hours and 45 minutes.
    For example, the format is:_HW_YYYY-MM-DD hh:mm:ssZ-24h10m
    大A negative number follows the uppercase character Z to reduce the displacement.
    Output the time after the current time is reduced by 24 hours and 10 minutes.
    Support the time and frequency of the unit is as follows:
    "ns", "us" (or "µs"), "ms", "s", "m", "h"


    On the basis of the original time variable, the lowercase z character represents the addition or subtraction of the year, month, and day.


    For example, the format is :_HW_YYYY-MM-DD hh:mm:ssz1,2,3
    The current time is accumulated 1 year,2 months, and 3 days.
    For example, the format is:_HW_YYYY-MM-DD hhz-1,-2,-3
    The current output time is reduced by 1 year, 2 months and 3 days

  2. Timestamp variable
    format:_HW_timestamp10
    Displays the current 10-bit system timestamp, For example,1669450716
    Format:_HW_timestamp13
    Displays the current 13-bit system timestamp, for example,1669450716142
    Format:_HW_timestamp16
    Displays the 16-bit timestamp of the current system, for example, 1669450716142516
    Format:_HW_timestamp19
    Displays the 19-bit timestamp of the current system, for example, 1669450716142516700

  3. UUID variables
    format:_HW_UUID32
    output 32-bit UUID,for example,D54C3C7163844E4DB4F073E8EEC83328
    format:_HW_uuid32
    output 32-bit UUID,for example,d54c3c7163844e4dB4f073e8eec83328

  • Pass built-in variables on the command line

etl_engine -fileUrl ./global6.xml -logLevel debug arg1=_HW_YYYY-MM-DD0x32hh:mm:ss.SSS arg2=_HW_YYYY-MM-DD
  • Built-in variables are referenced in configuration files

<Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="node 1" fetchSize="500"><Script name="sqlScript"><![CDATA[select* from (select* from t5 where tag_1='${arg1}' limit 1000)    ]]></Script><Node id="XLS_WRITER_01"   type="XLS_WRITER" desc="output node 2" appendRow="true"  fileURL="${arg2}.xlsx" _fileURL="d:/demo/test2.xlsx" startRow="3" metadataRow="2" sheetName="Personnel information" outputFields="c1;c3;tag_1"  renameOutputFields="Index=B;Year=C;Region=D">

Support for parsing embedded go language

You can embed your own business logic in the tag<BeforeOut></BeforeOut> of any output node,More introduction

  • Add field

Multiple fields can be added and given default values

package extimport ("errors""fmt""strconv")func RunScript(dataValue string) (result string, topErr error) {newRows :=""rows := gjson.Get(dataValue,"rows")for index, row := rangerows.Array() {  //tmpStr, _ :=sjson.Set(row.String(),"addCol1",time.Now().Format("2006-01-02 15:04:05.000"))tmpStr, _ :=sjson.Set(row.String(),"addCol1","1")tmpStr, _ = sjson.Set(tmpStr,"addCol2","${arg2}")newRows, _ = sjson.SetRaw(newRows,"rows.-1", tmpStr)}return newRows, nil}
  • Merge field

Multiple fields can be combined into a single field

package extimport ("errors""fmt""strconv")func RunScript(dataValue string) (result string, topErr error) {newRows :=""rows := gjson.Get(dataValue,"rows")for index, row := rangerows.Array() {area :=gjson.Get(row.String(),"tag_1").String()year :=gjson.Get(row.String(),"c3").String()tmpStr, _ :=sjson.Set(row.String(),"tag_1", area +"_" + year)newRows, _ = sjson.SetRaw(newRows,"rows.-1", tmpStr)}return newRows, nil}
  • Complete sample

<?xml version="1.0" encoding="UTF-8"?><Graph>   <Node   type="CSV_READER" desc="node 1" fetchSize="500"  fileURL="d:/demo.csv" startRow="1" fields="field1;field2;field3;field4"  fieldsIndex="0;1;2;3"  >  </Node>     <Node   type="OUTPUT_TRASH" desc="node 2"  >        <BeforeOut>            <![CDATA[package extimport ("errors""fmt""strconv""strings""time""github.com/tidwall/gjson""github.com/tidwall/sjson""etl-engine/etl/tool/extlibs/common""io/ioutil""os")func RunScript(dataValue string) (result string, topErr error) {defer func() {if topLevelErr := recover(); topLevelErr != nil {topErr = errors.New("RunScript Capture fatal error" + topLevelErr.(error).Error())} else {topErr = nil}}()newRows := ""GenLine(dataValue,"db1","autogen","t13","field2","field3;field4")return newRows, nil}//Received is JSONfunc GenLine(dataValue string, db string, rp string, measurement string, fields string, tags string) error {head := "# DML\n# CONTEXT-DATABASE: " + db + "\n# CONTEXT-RETENTION-POLICY: " + rp + "\n\n"line := ""    fieldLine := ""    tagLine := ""_t_ := strings.Split(tags, ";")_f_ := strings.Split(fields, ";")rows := gjson.Get(dataValue, "rows")for _, row := range rows.Array() {        fieldLine = ""        tagLine = ""for i1 := 0; i1 < len(_t_); i1++ {tagValue := gjson.Get(row.String(), _t_[i1])tagLine = tagLine + _t_[i1] + "=\"" + tagValue.String() + "\","}tagLine = tagLine[0 : len(tagLine)-1]for i1 := 0; i1 < len(_f_); i1++ {fieldValue := gjson.Get(row.String(), _f_[i1])fieldLine = fieldLine + _f_[i1] + "=" + fieldValue.String() + ","}fieldLine = fieldLine[0 : len(fieldLine)-1]if len(tagLine) > 0 && len(fieldLine) > 0 {    line = line + measurement + "," + tagLine + " " + fieldLine + " " + strconv.FormatInt(time.Now().Add(500*time.Millisecond).UnixNano(), 10) + "\n"        } else {                        if len(fieldLine) > 0 {                line = line + measurement + "," + fieldLine + " " + strconv.FormatInt(time.Now().Add(500*time.Millisecond).UnixNano(), 10) + "\n"            }        }}if len(line) > 0 {txt := head + linefileName := "d:/"+strconv.FormatInt(time.Now().UnixNano(), 10)WriteFileToDB(fileName, txt)err1 := os.Remove(fileName)if err1 != nil {fmt.Println("delete temp file fail:", fileName)return err1}}return nil}func WriteFileToDB(fileName string, txt string) {buf := []byte(txt)err := ioutil.WriteFile(fileName, buf, 0666)if err != nil {fmt.Println("write file fail:", err)return} else {cmdLine := "D:/software/influxdb-1.8.10-1/influx.exe  -import -path=" + fileName + " -host 127.0.0.1 -port 58086 -username u1 -password 123456 -precision=ns"//fmt.Println("cmdLine:",cmdLine)common.Command3("GB18030", "cmd", "/c", cmdLine)}}              ]]>        </BeforeOut>    </Node>      <Line type="STANDARD" from="CSV_READER_01" to="OUTPUT_TRASH_01" order="0" metadata="METADATA_03">线标注</Line>    <Metadata>        <Field name="field1" type="string" default="-1" nullable="false"/>        <Field name="field2" type="string" default="-1" nullable="false"/>        <Field name="field3" type="string" default="-1" nullable="false"/>        <Field name="field4" type="string" default="-1" nullable="false"/>    </Metadata>   </Graph>

Cooperative mode

Welcome docking and cooperation

etl-engine industry-wide connection...

``` @auth Mr Huang mail:hw2499@sohu.com vx:weigeonlyyou```
Clone this wiki locally

[8]ページ先頭

©2009-2025 Movatter.jp