Flink消费kafka数据

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink消费kafka数据

1.pom

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.8.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.8.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    <version>1.8.0</version>
</dependency>

2.接kafka消息

public static FlinkKafkaConsumer010<String> createConsumers(ParameterTool params, String topic) {
    Properties props = new Properties();
    String brokers = params.getProperties().getProperty("kafka.consumer.brokers");
    System.out.println(brokers);
    String groupid = params.getProperties().getProperty("kafka.channel.consumer.group.id");
    System.out.println(groupid);
    //String topic = params.getProperties().getProperty("kafka.web.info.topic");
    System.out.println(topic);
    props.setProperty("bootstrap.servers", brokers);
    props.setProperty("group.id", groupid);
    FlinkKafkaConsumer010<String> consumer010 =
            new FlinkKafkaConsumer010<>(topic, new org.apache.flink.api.common.serialization.SimpleStringSchema(), props);
    return consumer010;
}

3.config

package com.vince.xq.common;
import org.apache.flink.api.java.utils.ParameterTool;
import java.io.InputStream;
public class Configs {
    public static ParameterTool loadConfig(String configFileName) throws Exception {
        try (InputStream is = Configs.class.getClassLoader().getResourceAsStream(configFileName)) {
            return ParameterTool.fromPropertiesFile(is);
        }
    }
}


4.主程序

public static void main(String[] args) throws Exception {
    /*if (args == null || args.length == 0) {
        throw new RuntimeException("config file name must be config, config is args[0]");
    }*/
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //DataStream<String> userStream = env.addSource(new RandomUserSource());
    ParameterTool parameterTool = Configs.loadConfig("config-test.properties");
    env.getConfig().setGlobalJobParameters(parameterTool);
    //String topic = parameterTool.getProperties().getProperty("kafka.user.info.topic");
    String topic="test1115";
    DataStream<String> userStream = env.addSource(Utils.createConsumers(parameterTool, topic));
    DataStream<String> userMapperStream = userStream.flatMap(new UserMapper());
    String index = "sys_user_test";
    String type = "user";
    userMapperStream.addSink(Utils.createEsProducer(parameterTool, index, type));
    env.execute("user-to-es");
}

参考:

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html


相关文章
|
2月前
|
消息中间件 关系型数据库 Kafka
flink cdc 数据问题之数据丢失如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
116 0
|
2月前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
55 3
|
2月前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之flink Oraclecdc 捕获19C数据时报错错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
19天前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
33 0
|
16天前
|
消息中间件 存储 缓存
【Kakfa】Kafka 的Topic中 Partition 数据是怎么存储到磁盘的?
【4月更文挑战第13天】【Kakfa】Kafka 的Topic中 Partition 数据是怎么存储到磁盘的?
|
2月前
|
分布式计算 Hadoop Java
Flink CDC产品常见问题之tidb cdc 数据量大了就疯狂报空指针如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
资源调度 关系型数据库 测试技术
Flink CDC产品常见问题之没有报错但是一直监听不到数据如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
消息中间件 存储 缓存
Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
43 1
|
2月前
|
消息中间件 关系型数据库 MySQL
Flink CDC产品常见问题之把flink cdc同步的数据写入到目标服务器失败如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
消息中间件 canal Kafka
flink cdc 数据问题之数据堆积严重如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。

热门文章

最新文章