DataX插件开发-KafkaWriter

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: DataX插件开发-KafkaWriter

下载源码

下载源码:https://github.com/alibaba/DataX/releases/tag/datax_v202210

这里使用的是 : datax_v202210 版本

DataX使用手册:https://github.com/alibaba/DataX/blob/master/introduction.md

DataX插件说明:https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md

插件开发

创建kafkawriter模块

pom.xml

主要添加kafka-clients依赖,datax一些默认依赖,内容如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>datax-all</artifactId>
        <groupId>com.alibaba.datax</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>kafkawriter</artifactId>

    <properties>
        <kafka.version>1.1.1</kafka.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>datax-common</artifactId>
            <version>${datax-project-version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- compiler plugin -->
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${jdk-version}</source>
                    <target>${jdk-version}</target>
                    <encoding>${project-sourceEncoding}</encoding>
                </configuration>
            </plugin>
            <!-- assembly plugin -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptors>
                        <descriptor>src/main/assembly/package.xml</descriptor>
                    </descriptors>
                    <finalName>datax</finalName>
                </configuration>
                <executions>
                    <execution>
                        <id>dwzip</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

说明:

  • maven-assembly-plugin
首先我们要了解Datax打包的方式:Maven-assembly-plugin
1、作用:要想将写的程序和它本身所依赖的jar包一起build到一个包里,是maven中针对打包任务而提供的标准插件。
2、其他作用:
  1)提供一个把工程依赖元素、模块、网站文档等其他文件存放到单个归档文件里。
  2)打包成指定格式分发包,支持各种主流的格式如zip、tar.gz、jar和war等,具体打包哪些文件是高度可控的。
  3)能够自定义包含/排除指定的目录或文件。
  总体来说,实现插件maven-assembly-plugin需要两个步骤:
  第1步骤:pom.xml文件里配置maven-assembly-plugin,指定描述文件
  第2步骤:描述文件配置具体参数

在kafkawriter的pom文件中需要新增 
   <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
           <configuration>
               <descriptors> <!--描述文件路径-->
                   <descriptor>src/main/assembly/package.xml</descriptor>
               </descriptors> 
               <finalName>datax</finalName>
           </configuration>
           <executions>
               <execution>
                   <id>dwzip</id>
                   <phase>package</phase> <!-- 绑定到package生命周期阶段上 -->
                   <goals>
                       <goal>single</goal> <!-- 只运行一次 -->
                   </goals>
               </execution>
           </executions>
    </plugin>

plugin.sjon

存放目录 src/main/resources/plugin.json

{
   
    "name": "kafkawriter",
    "class": "com.alibaba.datax.plugin.writer.KafkaWriter",
    "description": "Kafka Writer",
    "developer": "Jast"
}

package.xml

存在目录src/main/assembly/package.xml

<assembly
        xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    <id></id>
    <formats>
        <format>dir</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <fileSets>
        <fileSet>
            <directory>src/main/resources</directory>
            <includes>
                <include>plugin.json</include>
            </includes>
            <outputDirectory>plugin/writer/kafkawriter</outputDirectory>
        </fileSet>
        <fileSet>
            <directory>target/</directory>
            <includes>
                <include>kafkawriter-0.0.1-SNAPSHOT.jar</include>
            </includes>
            <outputDirectory>plugin/writer/kafkawriter</outputDirectory>
        </fileSet>
    </fileSets>

    <dependencySets>
        <dependencySet>
            <useProjectArtifact>false</useProjectArtifact>
            <outputDirectory>plugin/writer/kafkawriter/libs</outputDirectory>
            <scope>runtime</scope>
        </dependencySet>
    </dependencySets>
</assembly>

com.alibaba.datax.plugin.writer.KafkaWriter
package com.alibaba.datax.plugin.writer;

import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;


/**
 *
 Job和Task之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。两者之间只能通过配置文件进行依赖。
 prepare和post在Job和Task中都存在,插件需要根据实际情况确定在什么地方执行操作。
 * @author mac
 */
public class KafkaWriter extends Writer {
   


    public static class Job extends Writer.Job {
   

        private static final Logger log = LoggerFactory.getLogger(Job.class);

        private Configuration conf = null;

        /**
         * init: Job对象初始化工作,此时可以通过super.getPluginJobConf()获取与本插件相关的配置。
         * 读插件获得配置中reader部分,写插件获得writer部分。
         */
        @Override
        public void init() {
   
            this.conf = super.getPluginJobConf();//获取配置文件信息{parameter 里面的参数}
            log.info("kafka writer params:{}", conf.toJSON());
            //校验 参数配置
            this.validateParameter();
        }


        private void validateParameter() {
   
            //toipc 必须填
            this.conf
                    .getNecessaryValue(
                            Key.TOPIC,
                            KafkaWriterErrorCode.REQUIRED_VALUE);


            this.conf
                    .getNecessaryValue(
                            Key.BOOTSTRAP_SERVERS,
                            KafkaWriterErrorCode.REQUIRED_VALUE);

        }

        /**
         * prepare: 全局准备工作,比如odpswriter清空目标表。
         */
        @Override
        public void prepare() {
   

        }

        /**
         * split: 拆分Task。参数adviceNumber框架建议的拆分数,一般是运行时所配置的并发度。值返回的是Task的配置列表。
         * @param mandatoryNumber
         *            为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!
         *
         * @return
         */
        @Override
        public List<Configuration> split(int mandatoryNumber) {
   
            //按照reader 配置文件的格式  来 组织相同个数的writer配置文件
            List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
            for (int i = 0; i < mandatoryNumber; i++) {
   
                configurations.add(conf);
            }
            return configurations;
        }


        /**
         * post: 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。
         */
        @Override
        public void post() {
   
                    log.info("job destroy ");
        }

        /**
         * destroy: Job对象自身的销毁工作。
         */
        @Override
        public void destroy() {
   

        }

    }


    public static class Task extends Writer.Task {
   
        private static final Logger log = LoggerFactory.getLogger(Task.class);

        private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");

        private Producer<String, String> producer;

        private String fieldDelimiter;

        private Configuration conf;

        /**
         * init:Task对象的初始化。此时可以通过super.getPluginJobConf()获取与本Task相关的配置。这里的配置是Job的split方法返回的配置列表中的其中一个。
         */
        @Override
        public void init() {
   
            this.conf = super.getPluginJobConf();
            fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
            //初始化kafka
            Properties props = new Properties();
            props.put("bootstrap.servers", conf.getString(Key.BOOTSTRAP_SERVERS));
            props.put("acks", "all");//这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
            props.put("retries", 0);
            // Controls how much bytes sender would wait to batch up before publishing to Kafka.
            //控制发送者在发布到kafka之前等待批处理的字节数。
            //控制发送者在发布到kafka之前等待批处理的字节数。 满足batch.size和ling.ms之一,producer便开始发送消息
            //默认16384   16kb
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer(props);

        }

        /**
         * prepare:局部的准备工作。
         */
        @Override
        public void prepare() {
   
            super.prepare();
        }

        /**
         * startWrite:从RecordReceiver中读取数据,写入目标数据源。RecordReceiver中的数据来自Reader和Writer之间的缓存队列。
         * @param lineReceiver
         */
        @Override
        public void startWrite(RecordReceiver lineReceiver) {
   

            log.info("start to writer kafka");
            Record record = null;
            while ((record = lineReceiver.getFromReader()) != null) {
   //说明还在读取数据,或者读取的数据没处理完
                //获取一行数据,按照指定分隔符 拼成字符串 发送出去
                producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
                        recordToString(record), recordToString(record)));

            }
        }

        /**
         * destroy: Task象自身的销毁工作。
         */
        @Override
        public void destroy() {
   
            log.info("Waiting for message to be successfully sent");
            producer.flush();
            log.info("Message sent successfully");
            if (producer != null) {
   
                producer.close();
            }
        }


        private String recordToString(Record record) {
   
            int recordLength = record.getColumnNumber();
            if (0 == recordLength) {
   
                return NEWLINE_FLAG;
            }

            Column column;
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < recordLength; i++) {
   
                column = record.getColumn(i);
                sb.append(column.asString()).append(fieldDelimiter);
            }
            sb.setLength(sb.length() - 1);
            sb.append(NEWLINE_FLAG);

            return sb.toString();
        }

    }


}
com.alibaba.datax.plugin.writer.KafkaWriterErrorCode
package com.alibaba.datax.plugin.writer;

import com.alibaba.datax.common.spi.ErrorCode;

public enum KafkaWriterErrorCode implements ErrorCode {
   
    REQUIRED_VALUE("KafkaWriter-00", "Required parameter is not filled .")
    ;

    private final String code;
    private final String description;

    KafkaWriterErrorCode(String code, String description) {
   
        this.code = code;
        this.description = description;
    }

    @Override
    public String getCode() {
   
        return this.code;
    }

    @Override
    public String getDescription() {
   
        return this.description;
    }

    @Override
    public String toString() {
   
        return String.format("Code:[%s], Description:[%s]. ", this.code,
                this.description);
    }
}
com.alibaba.datax.plugin.writer.Key
package com.alibaba.datax.plugin.writer;

public class Key {
   
    public static final String FIELD_DELIMITER = "fieldDelimiter";
    public static final String BOOTSTRAP_SERVERS = "bootstrapServers";
    public static final String TOPIC = "topic";
}

在DataX项目根目录下修改package.xml文件

fileSets中添加

 <fileSet>
            <directory>kafkawriter/target/datax/</directory>
            <includes>
                <include>**/*.*</include>
            </includes>
            <outputDirectory>datax</outputDirectory>
        </fileSet>

打包

mvn -U clean package assembly:assembly -Dmaven.test.skip=true

或者IDEA中直接打包

生成打插件在目录DataX-datax_v202210/kafkawriter/target/datax/plugin

安装Datax

下载DataX

wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz

解压

tar -zxvf datax.tar.gz

上传自定义KafkaWriter

将自己开发的plugin目录上传到DataX工具目录下,并解压

[hadoop@10 ~/datax]$ ll plugin/writer/
total 148
.....
drwxr-xr-x 3 hadoop hadoop 4096 Dec  9 15:22 kafkawriter
.....

创建启动配置文件text2kafka.json

实现功能:读取文本内容,将数据发送到Kafka

{
   
    "setting": {
   },
    "job": {
   
        "setting": {
   
            "speed": {
   
                "channel": 2
            }
        },
        "content": [
            {
   
                "reader": {
   
                    "name": "txtfilereader",
                    "parameter": {
   
                        "path": ["/home/hadoop/data.txt"],
                        "encoding": "UTF-8",
                        "column": [

                        ],
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
   
                    "name": "kafkawriter",
                    "parameter": {
   
                        "topic": "behavior_test",
                        "bootstrapServers": "10.16.0.2:9092",
                        "fieldDelimiter": ","
                    }
                }
            }
        ]
    }
}

启动

python bin/datax.py job/text2kafka.json

可能报错:

2022-12-09 15:18:30.412 [main] WARN ConfigParser - 插件[txtfilereader,kafkawriter]加载失败,1s后重试... Exception:Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/home/hadoop/datax/plugin/writer/.DS_Store/plugin.json]不存在. 请检查您的配置文件.

原因,Mac电脑打包自己默认将.DS_Store打进去了,需要删除,不然DataX会解析失败

执行结果

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2022-12-09 15:23:56.947 [main] INFO  MessageSource - JVM TimeZone: GMT+08:00, Locale: zh_CN
2022-12-09 15:23:56.949 [main] INFO  MessageSource - use Locale: zh_CN timeZone: sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]
2022-12-09 15:23:56.959 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2022-12-09 15:23:56.963 [main] INFO  Engine - the machine info  => 

        osInfo: Tencent 1.8 25.282-b1
        jvmInfo:        Linux amd64 5.4.119-19-0007
        cpu num:        8

        totalPhysicalMemory:    -0.00G
        freePhysicalMemory:     -0.00G
        maxFileDescriptorCount: -1
        currentOpenFileDescriptorCount: -1

        GC Names        [PS MarkSweep, PS Scavenge]

        MEMORY_NAME                    | allocation_size                | init_size                      
        PS Eden Space                  | 256.00MB                       | 256.00MB                       
        Code Cache                     | 240.00MB                       | 2.44MB                         
        Compressed Class Space         | 1,024.00MB                     | 0.00MB                         
        PS Survivor Space              | 42.50MB                        | 42.50MB                        
        PS Old Gen                     | 683.00MB                       | 683.00MB                       
        Metaspace                      | -0.00MB                        | 0.00MB                         


2022-12-09 15:23:56.976 [main] INFO  Engine - 
{
        "content":[
                {
                        "reader":{
                                "name":"txtfilereader",
                                "parameter":{
                                        "column":[],
                                        "encoding":"UTF-8",
                                        "fieldDelimiter":",",
                                        "path":[
                                                "/home/hadoop/data.txt"
                                        ]
                                }
                        },
                        "writer":{
                                "name":"kafkawriter",
                                "parameter":{
                                        "bootstrapServers":"10.16.0.2:9092",
                                        "fieldDelimiter":",",
                                        "topic":"behavior_test"
                                }
                        }
                }
        ],
        "setting":{
                "speed":{
                        "channel":2
                }
        }
}

2022-12-09 15:23:56.988 [main] WARN  Engine - prioriy set to 0, because NumberFormatException, the value is: null
2022-12-09 15:23:56.989 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2022-12-09 15:23:56.989 [main] INFO  JobContainer - DataX jobContainer starts job.
2022-12-09 15:23:56.991 [main] INFO  JobContainer - Set jobId = 0
2022-12-09 15:23:57.003 [job-0] INFO  KafkaWriter$Job - kafka writer params:{"bootstrapServers":"10.16.0.2:9092","fieldDelimiter":",","topic":"behavior_test"}
2022-12-09 15:23:57.004 [job-0] INFO  JobContainer - jobContainer starts to do prepare ...
2022-12-09 15:23:57.004 [job-0] INFO  JobContainer - DataX Reader.Job [txtfilereader] do prepare work .
2022-12-09 15:23:57.005 [job-0] INFO  TxtFileReader$Job - add file [/home/hadoop/data.txt] as a candidate to be read.
2022-12-09 15:23:57.005 [job-0] INFO  TxtFileReader$Job - 您即将读取的文件数为: [1]
2022-12-09 15:23:57.005 [job-0] INFO  JobContainer - DataX Writer.Job [kafkawriter] do prepare work .
2022-12-09 15:23:57.006 [job-0] INFO  JobContainer - jobContainer starts to do split ...
2022-12-09 15:23:57.006 [job-0] INFO  JobContainer - Job set Channel-Number to 2 channels.
2022-12-09 15:23:57.006 [job-0] INFO  JobContainer - DataX Reader.Job [txtfilereader] splits to [1] tasks.
2022-12-09 15:23:57.006 [job-0] INFO  JobContainer - DataX Writer.Job [kafkawriter] splits to [1] tasks.
2022-12-09 15:23:57.021 [job-0] INFO  JobContainer - jobContainer starts to do schedule ...
2022-12-09 15:23:57.024 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2022-12-09 15:23:57.025 [job-0] INFO  JobContainer - Running by standalone Mode.
2022-12-09 15:23:57.030 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2022-12-09 15:23:57.033 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2022-12-09 15:23:57.034 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2022-12-09 15:23:57.042 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2022-12-09 15:23:57.042 [0-0-0-reader] INFO  TxtFileReader$Task - reading file : [/home/hadoop/data.txt]
2022-12-09 15:23:57.058 [0-0-0-writer] INFO  ProducerConfig - ProducerConfig values: 
        acks = all
        batch.size = 16384
        bootstrap.servers = [10.16.0.2:9092]
        buffer.memory = 33554432
        client.id = 
        compression.type = none
        connections.max.idle.ms = 540000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 1
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 0
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2022-12-09 15:23:57.105 [0-0-0-reader] INFO  UnstructuredStorageReaderUtil - CsvReader使用默认值[{"captureRawRecord":true,"columnCount":0,"comment":"#","currentRecord":-1,"delimiter":",","escapeMode":1,"headerCount":0,"rawRecord":"","recordDelimiter":"\u0000","safetySwitch":false,"skipEmptyRecords":true,"textQualifier":"\"","trimWhitespace":true,"useComments":false,"useTextQualifier":true,"values":[]}],csvReaderConfig值为[null]
2022-12-09 15:23:57.158 [0-0-0-writer] INFO  AppInfoParser - Kafka version : 1.1.1
2022-12-09 15:23:57.158 [0-0-0-writer] INFO  AppInfoParser - Kafka commitId : 98b6346a977495f6
2022-12-09 15:23:57.159 [0-0-0-writer] INFO  KafkaWriter$Task - start to writer kafka
2022-12-09 15:23:57.227 [kafka-producer-network-thread | producer-1] INFO  Metadata - Cluster ID: 4SU0GyNWQpOfGrHCJfZwQw
2022-12-09 15:23:57.236 [0-0-0-writer] INFO  KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2022-12-09 15:23:57.243 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[202]ms
2022-12-09 15:23:57.243 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.
2022-12-09 15:24:07.040 [job-0] INFO  StandAloneJobContainerCommunicator - Total 11 records, 33 bytes | Speed 3B/s, 1 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-12-09 15:24:07.040 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2022-12-09 15:24:07.040 [job-0] INFO  JobContainer - DataX Writer.Job [kafkawriter] do post work.
2022-12-09 15:24:07.040 [job-0] INFO  JobContainer - DataX Reader.Job [txtfilereader] do post work.
2022-12-09 15:24:07.040 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
2022-12-09 15:24:07.041 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /home/hadoop/datax/hook
2022-12-09 15:24:07.042 [job-0] INFO  JobContainer - 
         [total cpu info] => 
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
                -1.00%                         | -1.00%                         | -1.00%


         [total gc info] => 
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
                 PS MarkSweep         | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             
                 PS Scavenge          | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             

2022-12-09 15:24:07.042 [job-0] INFO  JobContainer - PerfTrace not enable!
2022-12-09 15:24:07.043 [job-0] INFO  StandAloneJobContainerCommunicator - Total 11 records, 33 bytes | Speed 3B/s, 1 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-12-09 15:24:07.043 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2022-12-09 15:23:56
任务结束时刻                    : 2022-12-09 15:24:07
任务总计耗时                    :                 10s
任务平均流量                    :                3B/s
记录写入速度                    :              1rec/s
读出记录总数                    :                  11
读写失败总数                    :                   0

在Kafka查看消息

这里不做截图,实际是发送进来了

脚本样例

读Txt发送Kafka

{
   
    "setting": {
   },
    "job": {
   
        "setting": {
   
            "speed": {
   
                "channel": 2
            }
        },
        "content": [
            {
   
                "reader": {
   
                    "name": "txtfilereader",
                    "parameter": {
   
                        "path": ["/home/hadoop/data.txt"],
                        "encoding": "UTF-8",
                        "column": [

                        ],
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
   
                    "name": "kafkawriter",
                    "parameter": {
   
                        "topic": "behavior_test",
                        "bootstrapServers": "10.16.0.2:9092",
                        "fieldDelimiter": ","
                    }
                }
            }
        ]
    }
}

读Hive发送Kafka

同步脚本

 python bin/datax.py -p "-Dday=2020-03-10" job/hive2kafka.json

hive2kafka.json 文件

{
   
    "job": {
   
        "setting": {
   
            "speed": {
   
                "channel": 3
            }
        },
        "content": [
            {
   
                "reader": {
   
                    "name": "hdfsreader",
                    "parameter": {
   
                        "path": "/usr/hive/warehouse/ods.db/ods_tt_clue_intent/dt=${day}",
                        "defaultFS": "hdfs://10.16.0.12:4010",
                        "column": [
                               {
   
                                "index": 0,
                                "type": "string"
                               },
                               {
   
                                 "index": 1,
                                 "type": "string"
                               },
                               {
   
                                 "index": 2,
                                 "type": "string"
                               },
                               {
   
                                 "index": 3,
                                 "type": "string"
                               },
                               {
   
                                 "index": 4,
                                 "type": "string"
                               },
                               {
   
                                 "index": 5,
                                 "type": "string"
                               },
                               {
   
                                 "index": 6,
                                 "type": "string"
                               },
                               {
   
                                 "index": 7,
                                 "type": "string"
                               },
                               {
   
                                 "index": 8,
                                 "type": "string"
                               },
                               {
   
                                 "index": 9,
                                 "type": "string"
                               },
                               {
   
                                 "index": 10,
                                 "type": "string"
                               },
                               {
   
                                 "index": 11,
                                 "type": "string"
                               },
                               {
   
                                 "index": 12,
                                 "type": "string"
                               },
                               {
   
                                 "index": 13,
                                 "type": "string"
                               },
                               {
   
                                 "index": 14,
                                 "type": "string"
                               },
                               {
   
                                 "index": 15,
                                 "type": "string"
                               },
                               {
   
                                 "index": 16,
                                 "type": "string"
                               },
                               {
   
                                 "index": 17,
                                 "type": "string"
                               },
                               {
   
                                 "index": 18,
                                 "type": "string"
                               },
                               {
   
                                 "index": 19,
                                 "type": "string"
                               },
                               {
   
                                 "index": 20,
                                 "type": "string"
                               },
                               {
   
                                 "index": 21,
                                 "type": "string"
                               },
                               {
   
                                 "index": 22,
                                 "type": "string"
                               },
                               {
   
                                 "index": 23,
                                 "type": "string"
                               },
                               {
   
                                 "index": 24,
                                 "type": "string"
                               },
                               {
   
                                 "index": 25,
                                 "type": "string"
                               },
                               {
   
                                 "index": 26,
                                 "type": "string"
                               },
                               {
   
                                 "index": 27,
                                 "type": "string"
                               },
                               {
   
                                 "type": "string",
                                 "value": "${day}"
                               }
                        ],
                        "fieldDelimiter":",",
                        "fileType": "orc",
                        "nullFormat":"\\N"
                    }
                },
                "writer": {
   
                    "name": "kafkawriter",
                    "parameter": {
   
                        "topic": "behavior_test",
                        "bootstrapServers": "10.16.0.2:9092",
                        "fieldDelimiter": ","
                    }
                }
            }
        ]
    }
}

参考:https://www.imooc.com/article/259830

目录
相关文章
|
DataX 数据格式 Java
DataX插件编写指南
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、HDFS、Hive、OceanBase、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。DataX采用了框架 + 插件 的模式,目前已开源,代码托管在github。
13363 1
|
3月前
|
DataWorks NoSQL fastjson
DataWorks操作报错合集之DataX进行MongoDB全量迁移的过程中,DataX的MongoDB Reader插件在初始化阶段找不到Fastjson 2.x版本的类库,该怎么办
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
4月前
|
DataWorks Java 调度
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
58 0
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
|
11月前
|
Java 关系型数据库 MySQL
DataX教程(10)- DataX插件热插拔原理
DataX教程(10)- DataX插件热插拔原理
425 0
|
Oracle Java 关系型数据库
聊聊 datax 的 OceanBase 数据同步插件 ||批处理参数 rewriteBatchedStatements=true&useCursorFetch=true
聊聊 datax 的 OceanBase 数据同步插件 分析下批处理参数 rewriteBatchedStatements=true&useCursorFetch=true 对大规模数据读写的性能影响
聊聊 datax 的 OceanBase 数据同步插件 ||批处理参数 rewriteBatchedStatements=true&useCursorFetch=true
|
JSON 分布式计算 关系型数据库
DataX插件开发
面向DataX插件开发人员,阐述开发一个DataX插件所经过的历程,消除开发者的困惑,让插件开发变得简单。
4083 0
|
2月前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之数据集成并发数不支持批量修改,该怎么办
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
16天前
|
存储 分布式计算 DataWorks
dataworks数据集成
dataworks数据集成
48 1
|
29天前
|
机器学习/深度学习 DataWorks 数据挖掘
基于阿里云Hologres和DataWorks数据集成的方案
基于阿里云Hologres和DataWorks数据集成的方案
41 7
|
2月前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之数据集成任务日志中显示wait,是什么原因
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
下一篇
无影云桌面