Sink
Flink没有类似spark中foreach方法 让用户进行迭代操作 虽有对外的输出操作 都要利用Sink完成 最后通过类似如下方式完成整个任务最终输出操作
stream.addSink(new MySink(xxxx))
官方提供了一部分框架的Sink 除此之外 需要用户自定义实现sink
- 既然从kafka sensor主题中消费消息 所以需要有一个往该队列中发送消息的生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sensor
- 往sinkTest主题中生产消息 所以需要有一个监听该主题的消费者
./bin/kafka-console-consumer.sh --bootstrap-sever localhost:9092 --topic sinkTest
具体演示过程在这里面有详细说明Flink原理简介和使用
Redis Sink
- 源码
https://gitee.com/pingfanrenbiji/Flink-UserBehaviorAnalysis/blob/master/FlinkTutorial/src/main/scala/com/xdl/apitest/sinktest/RedisSinkTest.scala
- 源码分析
- set和hset比较
set : 1、普通的key-value方式存储数据 2、可以设置过期时间 3、时间复杂度为O(1) 4、每执行一个set 在redis中就会多一个key hset: 1、以hash散列表的形式存储 2、超时时间只能设置在大key上 3、单个filed则不能设置超时时间 4、时间复杂度是O(N) N是单个hash上filed的个数 5、hash上不适合存储大量的filed 多了比较消耗cpu 6、但以散列表存储比较节省内存 使用场景总结: 1、在实际的使用过程中 使用set应该保存单个大文本非结构化数据 2、hset则存储结构化数据 一个hash存储一条数据 一个filed存储一条数据中的一个属性 value则是属性对应的值 举例说明: 用户表 id,name,age,sex 1、1,张三,16,1 2、2,李四,22,1 3、3,王五,28,0 4、4,赵六,32,1 如果要整表缓存到 redis 中则使用 hash ,一条数据一个hash 一个hash 里则包含4个filed。 hset user_1 id 1 name 张三 age 16 sex 1 hset user_2 id 2 name 李四 age 16 sex 1 如果用户的某个属性值改变,还可以单个修改 把张三的年龄改为30 则可以使用命令 hset user_1 age 30
set存储举例: 1、缓存应用整个首页 html 2、某个商品的详情介绍 a、一般来说商品的详情介绍是makdown语法的富文本信息 b、html 格式的富文本信息 3、应用中的 某个热点数据