Flink-Redis-Sink

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
实时计算 Flink 版,5000CU*H 3个月
简介: 简介流式计算中,我们经常有一些场景是消费Kafka数据,进行处理,然后存储到其他的数据库或者缓存或者重新发送回其他的消息队列中。本文讲述一个简单的Redis作为Sink的案例。后续,我们会补充完善,比如落入Hbase,Kafka,Mysql等。

简介

流式计算中,我们经常有一些场景是消费Kafka数据,进行处理,然后存储到其他的数据库或者缓存或者重新发送回其他的消息队列中。
本文讲述一个简单的Redis作为Sink的案例。
后续,我们会补充完善,比如落入Hbase,Kafka,Mysql等。

关于Redis Sink

Flink提供了封装好的写入Redis的包给我们用,首先我们要新增一个依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-redis_2.10</artifactId>
    <version>1.1.5</version>
</dependency>

然后我们实现一个自己的RedisSinkExample:

//指定Redis set
public static final class RedisSinkExample implements RedisMapper<Tuple2<String,Integer>> {
public RedisCommandDescription getCommandDescription() {
    return new RedisCommandDescription(RedisCommand.SET, null);
}

public String getKeyFromData(Tuple2<String, Integer> data) {
    return data.f0;
}

public String getValueFromData(Tuple2<String, Integer> data) {
    return data.f1.toString();
}
}

我们用最简单的单机Redis的SET命令进行演示。

完整的代码如下,实现一个读取Kafka的消息,然后进行WordCount,并把结果更新到redis中:


public class RedisSinkTest {

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(2000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

//连接kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
consumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(consumer);
DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);

//实例化FlinkJedisPoolConfig 配置redis
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setHost("6379").build();
//实例化RedisSink,并通过flink的addSink的方式将flink计算的结果插入到redis

counts.addSink(new RedisSink<>(conf,new RedisSinkExample()));
env.execute("WordCount From Kafka To Redis");

}//
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    String[] tokens = value.toLowerCase().split("\\W+");
    for (String token : tokens) {
        if (token.length() > 0) {
            out.collect(new Tuple2<String, Integer>(token, 1));
        }
    }
}
}
//指定Redis set
public static final class RedisSinkExample implements RedisMapper<Tuple2<String,Integer>> {
public RedisCommandDescription getCommandDescription() {
    return new RedisCommandDescription(RedisCommand.SET, null);
}

public String getKeyFromData(Tuple2<String, Integer> data) {
    return data.f0;
}

public String getValueFromData(Tuple2<String, Integer> data) {
    return data.f1.toString();
}
}

}//
目录
相关文章
|
4月前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何确保多并发sink同时更新Redis值时,数据能按事件时间有序地更新并且保持一致性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL NoSQL Redis
Flink数据问题之数据写入Redis失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
消息中间件 NoSQL Kafka
【Flink-FlinkUtils】高级自定义封装工具类实现消费kafka数据保存数据到Redis
【Flink-FlinkUtils】高级自定义封装工具类实现消费kafka数据保存数据到Redis
341 0
【Flink-FlinkUtils】高级自定义封装工具类实现消费kafka数据保存数据到Redis
|
SQL 存储 NoSQL
Flink SQL 自定义 redis connector
一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connector 并不能满足所有用户的需求,这个时候就需要我们自定义 connector,这篇文章的重点就是介绍一下如何实现自定义 Flink SQL connector ? 先来看一下官网的一张 connector 架构图:
Flink SQL 自定义 redis connector
|
NoSQL 网络安全 Redis
flink测试redis sink报错
flink测试redis sink报错
|
消息中间件 Web App开发 监控
Flume+Kafka+Flink+Redis构建大数据实时处理系统:实时统计网站PV、UV展示
1.大数据处理的常用方法 大数据处理目前比较流行的是两种方法,一种是离线处理,一种是在线处理,基本处理架构如下: 在互联网应用中,不管是哪一种处理方式,其基本的数据来源都是日志数据,例如对于web应用来说,则可能是用户的访问日志、用户的点击日志等。
23152 15
|
22天前
|
canal 缓存 NoSQL
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
根据对一致性的要求程度,提出多种解决方案:同步删除、同步删除+可靠消息、延时双删、异步监听+可靠消息、多重保障方案
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
|
2月前
|
缓存 NoSQL Java
Redis深度解析:解锁高性能缓存的终极武器,让你的应用飞起来
【8月更文挑战第29天】本文从基本概念入手,通过实战示例、原理解析和高级使用技巧,全面讲解Redis这一高性能键值对数据库。Redis基于内存存储,支持多种数据结构,如字符串、列表和哈希表等,常用于数据库、缓存及消息队列。文中详细介绍了如何在Spring Boot项目中集成Redis,并展示了其工作原理、缓存实现方法及高级特性,如事务、发布/订阅、Lua脚本和集群等,帮助读者从入门到精通Redis,大幅提升应用性能与可扩展性。
60 0
|
22天前
|
存储 NoSQL Redis
SpringCloud基础7——Redis分布式缓存,RDB,AOF持久化+主从+哨兵+分片集群
Redis持久化、RDB和AOF方案、Redis主从集群、哨兵、分片集群、散列插槽、自动手动故障转移
SpringCloud基础7——Redis分布式缓存,RDB,AOF持久化+主从+哨兵+分片集群
|
1天前
|
缓存 NoSQL Java
Springboot自定义注解+aop实现redis自动清除缓存功能
通过上述步骤,我们不仅实现了一个高度灵活的缓存管理机制,还保证了代码的整洁与可维护性。自定义注解与AOP的结合,让缓存清除逻辑与业务逻辑分离,便于未来的扩展和修改。这种设计模式非常适合需要频繁更新缓存的应用场景,大大提高了开发效率和系统的响应速度。
8 2
下一篇
无影云桌面