下载源码
下载源码:https://github.com/alibaba/DataX/releases/tag/datax_v202210
这里使用的是 : datax_v202210 版本
DataX使用手册:https://github.com/alibaba/DataX/blob/master/introduction.md
DataX插件说明:https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md
插件开发
创建kafkawriter模块
pom.xml
主要添加kafka-clients
依赖,datax一些默认依赖,内容如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafkawriter</artifactId>
<properties>
<kafka.version>1.1.1</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
说明:
- maven-assembly-plugin
首先我们要了解Datax打包的方式:Maven-assembly-plugin
1、作用:要想将写的程序和它本身所依赖的jar包一起build到一个包里,是maven中针对打包任务而提供的标准插件。
2、其他作用:
1)提供一个把工程依赖元素、模块、网站文档等其他文件存放到单个归档文件里。
2)打包成指定格式分发包,支持各种主流的格式如zip、tar.gz、jar和war等,具体打包哪些文件是高度可控的。
3)能够自定义包含/排除指定的目录或文件。
总体来说,实现插件maven-assembly-plugin需要两个步骤:
第1步骤:pom.xml文件里配置maven-assembly-plugin,指定描述文件
第2步骤:描述文件配置具体参数
在kafkawriter的pom文件中需要新增
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors> <!--描述文件路径-->
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase> <!-- 绑定到package生命周期阶段上 -->
<goals>
<goal>single</goal> <!-- 只运行一次 -->
</goals>
</execution>
</executions>
</plugin>
plugin.sjon
存放目录 src/main/resources/plugin.json
{
"name": "kafkawriter",
"class": "com.alibaba.datax.plugin.writer.KafkaWriter",
"description": "Kafka Writer",
"developer": "Jast"
}
package.xml
存在目录src/main/assembly/package.xml
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
</includes>
<outputDirectory>plugin/writer/kafkawriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>kafkawriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/kafkawriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/kafkawriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
类
com.alibaba.datax.plugin.writer.KafkaWriter
package com.alibaba.datax.plugin.writer;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
*
Job和Task之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。两者之间只能通过配置文件进行依赖。
prepare和post在Job和Task中都存在,插件需要根据实际情况确定在什么地方执行操作。
* @author mac
*/
public class KafkaWriter extends Writer {
public static class Job extends Writer.Job {
private static final Logger log = LoggerFactory.getLogger(Job.class);
private Configuration conf = null;
/**
* init: Job对象初始化工作,此时可以通过super.getPluginJobConf()获取与本插件相关的配置。
* 读插件获得配置中reader部分,写插件获得writer部分。
*/
@Override
public void init() {
this.conf = super.getPluginJobConf();//获取配置文件信息{parameter 里面的参数}
log.info("kafka writer params:{}", conf.toJSON());
//校验 参数配置
this.validateParameter();
}
private void validateParameter() {
//toipc 必须填
this.conf
.getNecessaryValue(
Key.TOPIC,
KafkaWriterErrorCode.REQUIRED_VALUE);
this.conf
.getNecessaryValue(
Key.BOOTSTRAP_SERVERS,
KafkaWriterErrorCode.REQUIRED_VALUE);
}
/**
* prepare: 全局准备工作,比如odpswriter清空目标表。
*/
@Override
public void prepare() {
}
/**
* split: 拆分Task。参数adviceNumber框架建议的拆分数,一般是运行时所配置的并发度。值返回的是Task的配置列表。
* @param mandatoryNumber
* 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!
*
* @return
*/
@Override
public List<Configuration> split(int mandatoryNumber) {
//按照reader 配置文件的格式 来 组织相同个数的writer配置文件
List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++) {
configurations.add(conf);
}
return configurations;
}
/**
* post: 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。
*/
@Override
public void post() {
log.info("job destroy ");
}
/**
* destroy: Job对象自身的销毁工作。
*/
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private static final Logger log = LoggerFactory.getLogger(Task.class);
private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");
private Producer<String, String> producer;
private String fieldDelimiter;
private Configuration conf;
/**
* init:Task对象的初始化。此时可以通过super.getPluginJobConf()获取与本Task相关的配置。这里的配置是Job的split方法返回的配置列表中的其中一个。
*/
@Override
public void init() {
this.conf = super.getPluginJobConf();
fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
//初始化kafka
Properties props = new Properties();
props.put("bootstrap.servers", conf.getString(Key.BOOTSTRAP_SERVERS));
props.put("acks", "all");//这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
props.put("retries", 0);
// Controls how much bytes sender would wait to batch up before publishing to Kafka.
//控制发送者在发布到kafka之前等待批处理的字节数。
//控制发送者在发布到kafka之前等待批处理的字节数。 满足batch.size和ling.ms之一,producer便开始发送消息
//默认16384 16kb
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer(props);
}
/**
* prepare:局部的准备工作。
*/
@Override
public void prepare() {
super.prepare();
}
/**
* startWrite:从RecordReceiver中读取数据,写入目标数据源。RecordReceiver中的数据来自Reader和Writer之间的缓存队列。
* @param lineReceiver
*/
@Override
public void startWrite(RecordReceiver lineReceiver) {
log.info("start to writer kafka");
Record record = null;
while ((record = lineReceiver.getFromReader()) != null) {
//说明还在读取数据,或者读取的数据没处理完
//获取一行数据,按照指定分隔符 拼成字符串 发送出去
producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
recordToString(record), recordToString(record)));
}
}
/**
* destroy: Task象自身的销毁工作。
*/
@Override
public void destroy() {
log.info("Waiting for message to be successfully sent");
producer.flush();
log.info("Message sent successfully");
if (producer != null) {
producer.close();
}
}
private String recordToString(Record record) {
int recordLength = record.getColumnNumber();
if (0 == recordLength) {
return NEWLINE_FLAG;
}
Column column;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < recordLength; i++) {
column = record.getColumn(i);
sb.append(column.asString()).append(fieldDelimiter);
}
sb.setLength(sb.length() - 1);
sb.append(NEWLINE_FLAG);
return sb.toString();
}
}
}
com.alibaba.datax.plugin.writer.KafkaWriterErrorCode
package com.alibaba.datax.plugin.writer;
import com.alibaba.datax.common.spi.ErrorCode;
public enum KafkaWriterErrorCode implements ErrorCode {
REQUIRED_VALUE("KafkaWriter-00", "Required parameter is not filled .")
;
private final String code;
private final String description;
KafkaWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code,
this.description);
}
}
com.alibaba.datax.plugin.writer.Key
package com.alibaba.datax.plugin.writer;
public class Key {
public static final String FIELD_DELIMITER = "fieldDelimiter";
public static final String BOOTSTRAP_SERVERS = "bootstrapServers";
public static final String TOPIC = "topic";
}
在DataX项目根目录下修改package.xml文件
在fileSets
中添加
<fileSet>
<directory>kafkawriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
打包
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
或者IDEA中直接打包
生成打插件在目录DataX-datax_v202210/kafkawriter/target/datax/plugin
安装Datax
下载DataX
wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz
解压
tar -zxvf datax.tar.gz
上传自定义KafkaWriter
将自己开发的plugin
目录上传到DataX工具目录下,并解压
[hadoop@10 ~/datax]$ ll plugin/writer/
total 148
.....
drwxr-xr-x 3 hadoop hadoop 4096 Dec 9 15:22 kafkawriter
.....
创建启动配置文件text2kafka.json
实现功能:读取文本内容,将数据发送到Kafka
{
"setting": {
},
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": ["/home/hadoop/data.txt"],
"encoding": "UTF-8",
"column": [
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "kafkawriter",
"parameter": {
"topic": "behavior_test",
"bootstrapServers": "10.16.0.2:9092",
"fieldDelimiter": ","
}
}
}
]
}
}
启动
python bin/datax.py job/text2kafka.json
可能报错:
2022-12-09 15:18:30.412 [main] WARN ConfigParser - 插件[txtfilereader,kafkawriter]加载失败,1s后重试... Exception:Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/home/hadoop/datax/plugin/writer/.DS_Store/plugin.json]不存在. 请检查您的配置文件.
原因,Mac电脑打包自己默认将
.DS_Store
打进去了,需要删除,不然DataX会解析失败
执行结果
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
2022-12-09 15:23:56.947 [main] INFO MessageSource - JVM TimeZone: GMT+08:00, Locale: zh_CN
2022-12-09 15:23:56.949 [main] INFO MessageSource - use Locale: zh_CN timeZone: sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]
2022-12-09 15:23:56.959 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2022-12-09 15:23:56.963 [main] INFO Engine - the machine info =>
osInfo: Tencent 1.8 25.282-b1
jvmInfo: Linux amd64 5.4.119-19-0007
cpu num: 8
totalPhysicalMemory: -0.00G
freePhysicalMemory: -0.00G
maxFileDescriptorCount: -1
currentOpenFileDescriptorCount: -1
GC Names [PS MarkSweep, PS Scavenge]
MEMORY_NAME | allocation_size | init_size
PS Eden Space | 256.00MB | 256.00MB
Code Cache | 240.00MB | 2.44MB
Compressed Class Space | 1,024.00MB | 0.00MB
PS Survivor Space | 42.50MB | 42.50MB
PS Old Gen | 683.00MB | 683.00MB
Metaspace | -0.00MB | 0.00MB
2022-12-09 15:23:56.976 [main] INFO Engine -
{
"content":[
{
"reader":{
"name":"txtfilereader",
"parameter":{
"column":[],
"encoding":"UTF-8",
"fieldDelimiter":",",
"path":[
"/home/hadoop/data.txt"
]
}
},
"writer":{
"name":"kafkawriter",
"parameter":{
"bootstrapServers":"10.16.0.2:9092",
"fieldDelimiter":",",
"topic":"behavior_test"
}
}
}
],
"setting":{
"speed":{
"channel":2
}
}
}
2022-12-09 15:23:56.988 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null
2022-12-09 15:23:56.989 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2022-12-09 15:23:56.989 [main] INFO JobContainer - DataX jobContainer starts job.
2022-12-09 15:23:56.991 [main] INFO JobContainer - Set jobId = 0
2022-12-09 15:23:57.003 [job-0] INFO KafkaWriter$Job - kafka writer params:{"bootstrapServers":"10.16.0.2:9092","fieldDelimiter":",","topic":"behavior_test"}
2022-12-09 15:23:57.004 [job-0] INFO JobContainer - jobContainer starts to do prepare ...
2022-12-09 15:23:57.004 [job-0] INFO JobContainer - DataX Reader.Job [txtfilereader] do prepare work .
2022-12-09 15:23:57.005 [job-0] INFO TxtFileReader$Job - add file [/home/hadoop/data.txt] as a candidate to be read.
2022-12-09 15:23:57.005 [job-0] INFO TxtFileReader$Job - 您即将读取的文件数为: [1]
2022-12-09 15:23:57.005 [job-0] INFO JobContainer - DataX Writer.Job [kafkawriter] do prepare work .
2022-12-09 15:23:57.006 [job-0] INFO JobContainer - jobContainer starts to do split ...
2022-12-09 15:23:57.006 [job-0] INFO JobContainer - Job set Channel-Number to 2 channels.
2022-12-09 15:23:57.006 [job-0] INFO JobContainer - DataX Reader.Job [txtfilereader] splits to [1] tasks.
2022-12-09 15:23:57.006 [job-0] INFO JobContainer - DataX Writer.Job [kafkawriter] splits to [1] tasks.
2022-12-09 15:23:57.021 [job-0] INFO JobContainer - jobContainer starts to do schedule ...
2022-12-09 15:23:57.024 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2022-12-09 15:23:57.025 [job-0] INFO JobContainer - Running by standalone Mode.
2022-12-09 15:23:57.030 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2022-12-09 15:23:57.033 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2022-12-09 15:23:57.034 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2022-12-09 15:23:57.042 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2022-12-09 15:23:57.042 [0-0-0-reader] INFO TxtFileReader$Task - reading file : [/home/hadoop/data.txt]
2022-12-09 15:23:57.058 [0-0-0-writer] INFO ProducerConfig - ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [10.16.0.2:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 1
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2022-12-09 15:23:57.105 [0-0-0-reader] INFO UnstructuredStorageReaderUtil - CsvReader使用默认值[{"captureRawRecord":true,"columnCount":0,"comment":"#","currentRecord":-1,"delimiter":",","escapeMode":1,"headerCount":0,"rawRecord":"","recordDelimiter":"\u0000","safetySwitch":false,"skipEmptyRecords":true,"textQualifier":"\"","trimWhitespace":true,"useComments":false,"useTextQualifier":true,"values":[]}],csvReaderConfig值为[null]
2022-12-09 15:23:57.158 [0-0-0-writer] INFO AppInfoParser - Kafka version : 1.1.1
2022-12-09 15:23:57.158 [0-0-0-writer] INFO AppInfoParser - Kafka commitId : 98b6346a977495f6
2022-12-09 15:23:57.159 [0-0-0-writer] INFO KafkaWriter$Task - start to writer kafka
2022-12-09 15:23:57.227 [kafka-producer-network-thread | producer-1] INFO Metadata - Cluster ID: 4SU0GyNWQpOfGrHCJfZwQw
2022-12-09 15:23:57.236 [0-0-0-writer] INFO KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2022-12-09 15:23:57.243 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[202]ms
2022-12-09 15:23:57.243 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.
2022-12-09 15:24:07.040 [job-0] INFO StandAloneJobContainerCommunicator - Total 11 records, 33 bytes | Speed 3B/s, 1 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-12-09 15:24:07.040 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2022-12-09 15:24:07.040 [job-0] INFO JobContainer - DataX Writer.Job [kafkawriter] do post work.
2022-12-09 15:24:07.040 [job-0] INFO JobContainer - DataX Reader.Job [txtfilereader] do post work.
2022-12-09 15:24:07.040 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2022-12-09 15:24:07.041 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /home/hadoop/datax/hook
2022-12-09 15:24:07.042 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu | maxDeltaCpu | minDeltaCpu
-1.00% | -1.00% | -1.00%
[total gc info] =>
NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
PS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
PS Scavenge | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
2022-12-09 15:24:07.042 [job-0] INFO JobContainer - PerfTrace not enable!
2022-12-09 15:24:07.043 [job-0] INFO StandAloneJobContainerCommunicator - Total 11 records, 33 bytes | Speed 3B/s, 1 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-12-09 15:24:07.043 [job-0] INFO JobContainer -
任务启动时刻 : 2022-12-09 15:23:56
任务结束时刻 : 2022-12-09 15:24:07
任务总计耗时 : 10s
任务平均流量 : 3B/s
记录写入速度 : 1rec/s
读出记录总数 : 11
读写失败总数 : 0
在Kafka查看消息
这里不做截图,实际是发送进来了
脚本样例
读Txt发送Kafka
{
"setting": {
},
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": ["/home/hadoop/data.txt"],
"encoding": "UTF-8",
"column": [
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "kafkawriter",
"parameter": {
"topic": "behavior_test",
"bootstrapServers": "10.16.0.2:9092",
"fieldDelimiter": ","
}
}
}
]
}
}
读Hive发送Kafka
同步脚本
python bin/datax.py -p "-Dday=2020-03-10" job/hive2kafka.json
hive2kafka.json 文件
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/usr/hive/warehouse/ods.db/ods_tt_clue_intent/dt=${day}",
"defaultFS": "hdfs://10.16.0.12:4010",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
{
"index": 5,
"type": "string"
},
{
"index": 6,
"type": "string"
},
{
"index": 7,
"type": "string"
},
{
"index": 8,
"type": "string"
},
{
"index": 9,
"type": "string"
},
{
"index": 10,
"type": "string"
},
{
"index": 11,
"type": "string"
},
{
"index": 12,
"type": "string"
},
{
"index": 13,
"type": "string"
},
{
"index": 14,
"type": "string"
},
{
"index": 15,
"type": "string"
},
{
"index": 16,
"type": "string"
},
{
"index": 17,
"type": "string"
},
{
"index": 18,
"type": "string"
},
{
"index": 19,
"type": "string"
},
{
"index": 20,
"type": "string"
},
{
"index": 21,
"type": "string"
},
{
"index": 22,
"type": "string"
},
{
"index": 23,
"type": "string"
},
{
"index": 24,
"type": "string"
},
{
"index": 25,
"type": "string"
},
{
"index": 26,
"type": "string"
},
{
"index": 27,
"type": "string"
},
{
"type": "string",
"value": "${day}"
}
],
"fieldDelimiter":",",
"fileType": "orc",
"nullFormat":"\\N"
}
},
"writer": {
"name": "kafkawriter",
"parameter": {
"topic": "behavior_test",
"bootstrapServers": "10.16.0.2:9092",
"fieldDelimiter": ","
}
}
}
]
}
}