【Flink-FlinkUtils】高级自定义封装工具类实现消费kafka数据保存数据到Redis

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink-FlinkUtils】高级自定义封装工具类实现消费kafka数据保存数据到Redis

一、FlinkKafkaToRedis


思考一个问题:flink程序运行的时候,我们可以通过神魔样的形式进行传值?

1.写死程序传值

2.args[0],程序动态传值

ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]);
DataStream<String> lines =  FlinkUtils.createKafkaStream(parameters,SimpleStringSchema.class);
String groupId = parameters.get("group.id","consumer1");
String topics = parameters.getRequired("topics");

3.配置文件动态读取


20200924082305850.png


1.1 pom

    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>
    <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>

1.2 config.properties

# 可以传入多个topic,多个 ,隔开
topics=wang
group.id=consumer1
bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092
# earliest
auto.offset.reset=latest
# kafka不提交偏移量,由flink管理checkpoint
enable.auto.commit=false
# 30s写入内存一次 默认是内存,由于我没有指定checkpoint目录,会保存与jobManager的内存中
# 你自己可以配置到 HDFS 例如:
# env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));
# env.setStateBackend(new FsStateBackend("file:///D://APP//IDEA//workplace//FlinkTurbineFaultDiagnosis//checkpoint"));
checkpoint.interval=30000
# redis
redis.host=127.0.0.1
#redis.pwd=123456
redis.db=1

1.3 FlinkUtils.java

public class FlinkUtils {
    private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    public static <T> DataStream<T> createKafkaStream(ParameterTool parameters, Class<? extends DeserializationSchema<T>> clazz) throws Exception {
        //1.设置全局的参数
        env.getConfig().setGlobalJobParameters(parameters);
        //2.checkpoint配置
        env.enableCheckpointing(parameters.getLong("checkpoint.interval", 5000L), CheckpointingMode.EXACTLY_ONCE);
        //3.取消checkpoint任务不删除
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //4.kafka配置
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", parameters.getRequired("bootstrap.servers"));
        prop.setProperty("group.id", parameters.getRequired("group.id"));
        prop.setProperty("auto.offset.reset", parameters.get("auto.offset.reset", "earliest"));
        //5.不自动提交偏移量,交给flink的checkpoint处理哦
        prop.setProperty("enable.auto.commit", parameters.get("enable.auto.commit", "false"));
        String topics = parameters.getRequired("topics");
        List<String> topicList = Arrays.asList(topics.split(","));
        FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<T>(
                topicList,
                clazz.newInstance(),
                prop);
        return env.addSource(kafkaConsumer);
    }
    //获取执行环境
    public static StreamExecutionEnvironment getEnv() {
        return env;
    }
}

1.4 MyRedisSink.java

public class MyRedisSink extends RichSinkFunction<Turbine> {
    //初始化redis连接
    private transient Jedis jedis;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        String host = params.getRequired("redis.host");
        //String password = params.getRequired("redis.pwd");
        int db = params.getInt("redis.db", 0);
        jedis = new Jedis(host, 6379, 5000);
        //jedis.auth(password);
        jedis.select(db);
    }
    @Override
    public void invoke(Turbine value, Context context) throws Exception {
        if (!jedis.isConnected()) {
            jedis.connect();
        }
        //写入redis
        jedis.hset(value.word, value.province, String.valueOf(value.counts));
    }
    @Override
    public void close() throws Exception {
        super.close();
        jedis.close();
    }
}

1.5 KafkaToRedis

public class KafkaToRedis {
    public static void main(String[] args) throws Exception {
        ParameterTool parameters = ParameterTool.fromPropertiesFile("D:\\APP\\IDEA\\workplace\\FlinkTurbineFaultDiagnosis\\src\\main\\resources\\config.properties");
        DataStream<String> lines = FlinkUtils.createKafkaStream(parameters, SimpleStringSchema.class);
        lines.print();
        //输入的时String  返回一个对象
        SingleOutputStreamOperator<Turbine> map = lines.map(new MapFunction<String, Turbine>() {
            @Override
            public Turbine map(String value) throws Exception {
                String[] fields = value.split(" ");
                String word = fields[0];
                String province = fields[1];
                long counts = Long.parseLong(fields[2]);
                return Turbine.of(word, province, counts);
            }
        });
        map.addSink(new MyRedisSink());
        //执行程序
        FlinkUtils.getEnv().execute();
    }
}


目录
相关文章
|
1月前
|
NoSQL Redis
Redis的数据淘汰策略有哪些 ?
Redis 提供了 8 种数据淘汰策略,分为淘汰易失数据和淘汰全库数据两大类。易失数据淘汰策略包括:volatile-lru、volatile-lfu、volatile-ttl 和 volatile-random;全库数据淘汰策略包括:allkeys-lru、allkeys-lfu 和 allkeys-random。此外,还有 no-eviction 策略,禁止驱逐数据,当内存不足时新写入操作会报错。
73 16
|
1月前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
在项目中,为了解决Redis与Mysql的数据一致性问题,我们采用了多种策略:对于低一致性要求的数据,不做特别处理;时效性数据通过设置缓存过期时间来减少不一致风险;高一致性但时效性要求不高的数据,利用MQ异步同步确保最终一致性;而对一致性和时效性都有高要求的数据,则采用分布式事务(如Seata TCC模式)来保障。
64 14
|
1月前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
40 5
|
1月前
|
存储 NoSQL 算法
Redis分片集群中数据是怎么存储和读取的 ?
Redis集群采用哈希槽分区算法,共有16384个哈希槽,每个槽分配到不同的Redis节点上。数据操作时,通过CRC16算法对key计算并取模,确定其所属的槽和对应的节点,从而实现高效的数据存取。
49 13
|
1月前
|
存储 NoSQL Redis
Redis的数据过期策略有哪些 ?
Redis 采用两种过期键删除策略:惰性删除和定期删除。惰性删除在读取键时检查是否过期并删除,对 CPU 友好但可能积压大量过期键。定期删除则定时抽样检查并删除过期键,对内存更友好。默认每秒扫描 10 次,每次检查 20 个键,若超过 25% 过期则继续检查,单次最大执行时间 25ms。两者结合使用以平衡性能和资源占用。
47 11
|
1月前
|
监控 NoSQL 测试技术
【赵渝强老师】Redis的AOF数据持久化
Redis 是内存数据库,提供数据持久化功能,支持 RDB 和 AOF 两种方式。AOF 以日志形式记录每个写操作,支持定期重写以压缩文件。默认情况下,AOF 功能关闭,需在 `redis.conf` 中启用。通过 `info` 命令可监控 AOF 状态。AOF 重写功能可有效控制文件大小,避免性能下降。
|
1月前
|
存储 监控 NoSQL
【赵渝强老师】Redis的RDB数据持久化
Redis 是内存数据库,提供数据持久化功能以防止服务器进程退出导致数据丢失。Redis 支持 RDB 和 AOF 两种持久化方式,其中 RDB 是默认的持久化方式。RDB 通过在指定时间间隔内将内存中的数据快照写入磁盘,确保数据的安全性和恢复能力。RDB 持久化机制包括创建子进程、将数据写入临时文件并替换旧文件等步骤。优点包括适合大规模数据恢复和低数据完整性要求的场景,但也有数据完整性和一致性较低及备份时占用内存的缺点。
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
103 1
|
2月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
55 1
|
4月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
345 9
下一篇
DataWorks