Flink消费kafka数据

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink消费kafka数据

1.pom

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.8.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.8.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    <version>1.8.0</version>
</dependency>

2.接kafka消息

public static FlinkKafkaConsumer010<String> createConsumers(ParameterTool params, String topic) {
    Properties props = new Properties();
    String brokers = params.getProperties().getProperty("kafka.consumer.brokers");
    System.out.println(brokers);
    String groupid = params.getProperties().getProperty("kafka.channel.consumer.group.id");
    System.out.println(groupid);
    //String topic = params.getProperties().getProperty("kafka.web.info.topic");
    System.out.println(topic);
    props.setProperty("bootstrap.servers", brokers);
    props.setProperty("group.id", groupid);
    FlinkKafkaConsumer010<String> consumer010 =
            new FlinkKafkaConsumer010<>(topic, new org.apache.flink.api.common.serialization.SimpleStringSchema(), props);
    return consumer010;
}

3.config

package com.vince.xq.common;
import org.apache.flink.api.java.utils.ParameterTool;
import java.io.InputStream;
public class Configs {
    public static ParameterTool loadConfig(String configFileName) throws Exception {
        try (InputStream is = Configs.class.getClassLoader().getResourceAsStream(configFileName)) {
            return ParameterTool.fromPropertiesFile(is);
        }
    }
}


4.主程序

public static void main(String[] args) throws Exception {
    /*if (args == null || args.length == 0) {
        throw new RuntimeException("config file name must be config, config is args[0]");
    }*/
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //DataStream<String> userStream = env.addSource(new RandomUserSource());
    ParameterTool parameterTool = Configs.loadConfig("config-test.properties");
    env.getConfig().setGlobalJobParameters(parameterTool);
    //String topic = parameterTool.getProperties().getProperty("kafka.user.info.topic");
    String topic="test1115";
    DataStream<String> userStream = env.addSource(Utils.createConsumers(parameterTool, topic));
    DataStream<String> userMapperStream = userStream.flatMap(new UserMapper());
    String index = "sys_user_test";
    String type = "user";
    userMapperStream.addSink(Utils.createEsProducer(parameterTool, index, type));
    env.execute("user-to-es");
}

参考:

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html


相关文章
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
183 0
|
25天前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
157 61
|
2月前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
103 1
|
2月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
77 1
|
2月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
191 0
|
2月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
47 0
|
2月前
|
SQL 分布式计算 大数据
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
57 0
|
2月前
|
大数据 流计算
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
53 0
|
Java 中间件 流计算
Flink 如何分流数据
Flink 如何分流数据,3种分流方式
4119 0