Flink-05 Flink Java 3分钟上手 Redis FlinkJedisPoolConfig 从Kafka写入Redis FlinkKafkaConsumer消费 结果写入Redis

简介: Flink-05 Flink Java 3分钟上手 Redis FlinkJedisPoolConfig 从Kafka写入Redis FlinkKafkaConsumer消费 结果写入Redis

代码仓库

会同步代码到 GitHub

https://github.com/turbo-duck/flink-demo

内容介绍

上节我们已经实现了,对Kafka数据的消费和计算,最终把结果输出到了控制台上。如下图:

Kafka In Docker

TestKafkaProducer

数据写入到Kafka中的效果

FlinkConsumer

Flink消费Kafka的效果如下图,已经按照我们的需求进行计算了。

这节内容

本节依然使用Flink对Kafka进行消费,但与上节不同的是(上节将结果输出到控制台上),本节将把Flink计算的结果输出到Redis中进行保存(当然也可以存储到别的地方,这里以Redis为例)。


pom.xml

重点关注 flink-connector-redis_2.11 这个包。这是Redis相关的依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-demo-01</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.2</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>

    </dependencies>
</project>

KafkaProducer.java

生产数据存入到Kafka这种

package icu.wzk.demo05;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class TestKafkaProducer {

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "0.0.0.0:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 500; i++) {
            String key = "key-" + i;
            String value = "value-" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>("test", key, value);
            producer.send(record);
            System.out.println("send: " + key);
            Thread.sleep(200);
        }
        producer.close();
    }

}

StartApp

Flink消费Kafka,计算后写入到Redis中。

FlinkJedisPoolConfig

连接池的配置

MyRedisMapper

自定义的Mapper,需要实现RedisMapper

完整代码

package icu.wzk.demo05;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

import java.util.Properties;

public class StartApp {

    private static final String KAFKA_SERVER = "0.0.0.0:9092";

    private static final Integer KAFKA_PORT = 9092;

    private static final String KAFKA_TOPIC = "test";

    private static final String REDIS_SERVER = "0.0.0.0";

    private static final Integer REDIS_PORT = 6379;

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", String.format("%s:%d", KAFKA_SERVER, KAFKA_PORT));
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(KAFKA_TOPIC, new SimpleStringSchema(), properties);
        DataStreamSource<String> data = env.addSource(consumer);

        SingleOutputStreamOperator<Tuple2<String, String>> wordData = data.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                return new Tuple2<>("l_words", value);
            }
        });

        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig
                .Builder()
                .setHost(REDIS_SERVER)
                .setPort(REDIS_PORT)
                .build();
        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
        wordData.addSink(redisSink);
        env.execute();
    }

    public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.LPUSH);
        }

        @Override
        public String getKeyFromData(Tuple2<String,String> data) {
            return data.f0;
        }

        @Override
        public String getValueFromData(Tuple2<String,String> data) {
            return data.f1;
        }
    }

}


相关文章
|
25天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
17天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
4天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
1天前
|
人工智能 Rust Java
10月更文挑战赛火热启动,坚持热爱坚持创作!
开发者社区10月更文挑战,寻找热爱技术内容创作的你,欢迎来创作!
261 12
|
19天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
21天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2582 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
3天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
169 2
|
1天前
|
编译器 C#
C#多态概述:通过继承实现的不同对象调用相同的方法,表现出不同的行为
C#多态概述:通过继承实现的不同对象调用相同的方法,表现出不同的行为
101 65
|
21天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1578 16
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
|
5天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
258 2