【Flink-FlinkUtils】高级自定义封装工具类实现消费kafka数据保存数据到Redis

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 【Flink-FlinkUtils】高级自定义封装工具类实现消费kafka数据保存数据到Redis

一、FlinkKafkaToRedis


思考一个问题:flink程序运行的时候,我们可以通过神魔样的形式进行传值?

1.写死程序传值

2.args[0],程序动态传值

ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]);
DataStream<String> lines =  FlinkUtils.createKafkaStream(parameters,SimpleStringSchema.class);
String groupId = parameters.get("group.id","consumer1");
String topics = parameters.getRequired("topics");

3.配置文件动态读取


20200924082305850.png


1.1 pom

    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>
    <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>

1.2 config.properties

# 可以传入多个topic,多个 ,隔开
topics=wang
group.id=consumer1
bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092
# earliest
auto.offset.reset=latest
# kafka不提交偏移量,由flink管理checkpoint
enable.auto.commit=false
# 30s写入内存一次 默认是内存,由于我没有指定checkpoint目录,会保存与jobManager的内存中
# 你自己可以配置到 HDFS 例如:
# env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));
# env.setStateBackend(new FsStateBackend("file:///D://APP//IDEA//workplace//FlinkTurbineFaultDiagnosis//checkpoint"));
checkpoint.interval=30000
# redis
redis.host=127.0.0.1
#redis.pwd=123456
redis.db=1

1.3 FlinkUtils.java

public class FlinkUtils {
    private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    public static <T> DataStream<T> createKafkaStream(ParameterTool parameters, Class<? extends DeserializationSchema<T>> clazz) throws Exception {
        //1.设置全局的参数
        env.getConfig().setGlobalJobParameters(parameters);
        //2.checkpoint配置
        env.enableCheckpointing(parameters.getLong("checkpoint.interval", 5000L), CheckpointingMode.EXACTLY_ONCE);
        //3.取消checkpoint任务不删除
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //4.kafka配置
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", parameters.getRequired("bootstrap.servers"));
        prop.setProperty("group.id", parameters.getRequired("group.id"));
        prop.setProperty("auto.offset.reset", parameters.get("auto.offset.reset", "earliest"));
        //5.不自动提交偏移量,交给flink的checkpoint处理哦
        prop.setProperty("enable.auto.commit", parameters.get("enable.auto.commit", "false"));
        String topics = parameters.getRequired("topics");
        List<String> topicList = Arrays.asList(topics.split(","));
        FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<T>(
                topicList,
                clazz.newInstance(),
                prop);
        return env.addSource(kafkaConsumer);
    }
    //获取执行环境
    public static StreamExecutionEnvironment getEnv() {
        return env;
    }
}

1.4 MyRedisSink.java

public class MyRedisSink extends RichSinkFunction<Turbine> {
    //初始化redis连接
    private transient Jedis jedis;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        String host = params.getRequired("redis.host");
        //String password = params.getRequired("redis.pwd");
        int db = params.getInt("redis.db", 0);
        jedis = new Jedis(host, 6379, 5000);
        //jedis.auth(password);
        jedis.select(db);
    }
    @Override
    public void invoke(Turbine value, Context context) throws Exception {
        if (!jedis.isConnected()) {
            jedis.connect();
        }
        //写入redis
        jedis.hset(value.word, value.province, String.valueOf(value.counts));
    }
    @Override
    public void close() throws Exception {
        super.close();
        jedis.close();
    }
}

1.5 KafkaToRedis

public class KafkaToRedis {
    public static void main(String[] args) throws Exception {
        ParameterTool parameters = ParameterTool.fromPropertiesFile("D:\\APP\\IDEA\\workplace\\FlinkTurbineFaultDiagnosis\\src\\main\\resources\\config.properties");
        DataStream<String> lines = FlinkUtils.createKafkaStream(parameters, SimpleStringSchema.class);
        lines.print();
        //输入的时String  返回一个对象
        SingleOutputStreamOperator<Turbine> map = lines.map(new MapFunction<String, Turbine>() {
            @Override
            public Turbine map(String value) throws Exception {
                String[] fields = value.split(" ");
                String word = fields[0];
                String province = fields[1];
                long counts = Long.parseLong(fields[2]);
                return Turbine.of(word, province, counts);
            }
        });
        map.addSink(new MyRedisSink());
        //执行程序
        FlinkUtils.getEnv().execute();
    }
}


目录
相关文章
|
3天前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之想要加快消费 Kafka 数据的速度,该怎么配置参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
16 2
|
5天前
|
存储 监控 负载均衡
保证Redis的高可用性是一个涉及多个层面的任务,主要包括数据持久化、复制与故障转移、集群化部署等方面
【5月更文挑战第15天】保证Redis高可用性涉及数据持久化、复制与故障转移、集群化及优化策略。RDB和AOF是数据持久化方法,哨兵模式确保故障自动恢复。Redis Cluster实现分布式部署,提高负载均衡和容错性。其他措施包括身份认证、多线程、数据压缩和监控报警,以增强安全性和稳定性。通过综合配置与监控,可确保Redis服务的高效、可靠运行。
27 2
|
5天前
|
存储 监控 NoSQL
Redis处理大量数据主要依赖于其内存存储结构、高效的数据结构和算法,以及一系列的优化策略
【5月更文挑战第15天】Redis处理大量数据依赖内存存储、高效数据结构和优化策略。选择合适的数据结构、利用批量操作减少网络开销、控制批量大小、使用Redis Cluster进行分布式存储、优化内存使用及监控调优是关键。通过这些方法,Redis能有效处理大量数据并保持高性能。
25 0
|
1天前
|
消息中间件 Kubernetes Java
实时计算 Flink版操作报错合集之写入 Kafka 报错 "Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time 60000 ms",该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
11 0
|
1天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
19 0
|
1天前
|
SQL 消息中间件 Kafka
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
14 0
|
1天前
|
网络安全 流计算 Python
实时计算 Flink版操作报错合集之Flink sql-client 针对kafka的protobuf格式数据建表,报错:java.lang.ClassNotFoundException 如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
14 1
|
2天前
|
存储 缓存 NoSQL
由菜鸟到大神,谈谈redis的概念、实战、原理、高级使用方法
【5月更文挑战第18天】Redis是一个开源的内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。它支持多种类型的数据结构,如字符串、哈希、列表、集合、有序集合等。
20 10
|
2天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到报错:Apache Kafka Connect错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
13 5
|
2天前
|
消息中间件 Kafka 数据库连接
实时计算 Flink版操作报错合集之无法将消费到的偏移量提交到Kafka如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
12 3