Flink常用Sink(elasticsearch(es)Sink、RedisSink、KafkaSink、MysqlSink、FileSink)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云原生内存数据库 Tair,内存型 2GB
实时计算 Flink 版,5000CU*H 3个月
简介: Flink常用Sink(elasticsearch(es)Sink、RedisSink、KafkaSink、MysqlSink、FileSink)

flink输出到es、redis、mysql、kafka、file


文章目录


配置pom文件

公共实体类

KafkaSInk

ElasticsearchSink(EsSink)

RedisSink

MysqlSink(JdbcSink)

FileSink

自己先准备一下相关环境


配置pom文件


 <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.13.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>
    <dependencies>
        <!-- 引入 Flink 相关依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 引入日志管理相关依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>
        <!--  引入kafka  -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>
    <!--  redis依赖  -->
    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.0</version>
    </dependency>
    <!--  ES依赖  -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
     <!-- Mysql依赖-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.47</version>
    </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

公共实体类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@NoArgsConstructor
@ToString
@AllArgsConstructor
public class UserEvent {
    private String userName;
    private String url;
    private Long timestemp;
}

KafkaSInk

将数据输出到kafka中,先启动kafka consumer,再运行程序

import com.event.UserEvent;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Properties;
public class KafkaSinkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties properties = new Properties();
        //kafka相关配置
        properties.setProperty("bootstrap.servers", "hadoop01:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");
        DataStreamSource<String> stream = env.fromCollection(Arrays.asList(
                "xiaoming,www.baidu.com,1287538716253",
                "Mr Li,www.baidu.com,1287538710000",
                "Mr Zhang,www.baidu.com,1287538710900"
        ));
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
              //输出规则
                String[] split = value.split(",");
                return new UserEvent(split[0].trim(), split[1].trim(), Long.valueOf(split[2].trim())).toString();
            }
        });
    //启动kafkaconsumer指令
    // ./bin/kafka-console-consumer.sh  --bootstrap-server  localhost:9092 --topic events
        result.addSink(new FlinkKafkaProducer<String>(
            //kafka所在地址
                "hadoop01:9092",
                //指定输出的topic
                "events",
                new SimpleStringSchema()
        ));
        env.execute();
    }
}

运行结果

1.png


ElasticsearchSink(EsSink)


将数据输出到elasticsearch

示例代码

import com.event.UserEvent;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.table.descriptors.Elasticsearch;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
public class EsSinkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<UserEvent> userEventDataStreamSource =
                env.fromCollection(
                        Arrays.asList(
                                new UserEvent("zhangsan", "path?test123", System.currentTimeMillis() - 2000L),
                                new UserEvent("zhangsan", "path?test", System.currentTimeMillis() + 2000L),
                                new UserEvent("lisi", "path?checkParam", System.currentTimeMillis()),
                                new UserEvent("bob", "path?test", System.currentTimeMillis() + 2000L),
                                new UserEvent("mary", "path?checkParam", System.currentTimeMillis()),
                                new UserEvent("lisi", "path?checkParam123", System.currentTimeMillis() - 2000L)
                        ));
        //定义host列表
        List<HttpHost> hosts = Arrays.asList(new HttpHost("hadoop01", 9200));
        //定义ElasticsearchSinkFunction
        ElasticsearchSinkFunction<UserEvent> elasticsearchSinkFunction = new ElasticsearchSinkFunction<UserEvent>() {
            @Override
            public void process(UserEvent userEvent, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                IndexRequest indexRequest = Requests.indexRequest()
                        .index("events")
                        .type("type")
                        .source(new HashMap<String, String>() {{
                            put(userEvent.getUserName(), userEvent.getUrl());
                        }});
                requestIndexer.add(indexRequest);
            }
        };
        //写入es
        userEventDataStreamSource.addSink(new ElasticsearchSink.Builder<>(hosts, elasticsearchSinkFunction).build());
        env.execute();
    }
}

指令

GET _cat/indices
GET _cat/indices/events
GET events/_search

运行结果

1.png


RedisSink


将数据输出到Redis

示例代码

import com.event.UserEvent;
import my.test.source.CustomSouce;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class RedisSinkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<UserEvent> streamSource = env.addSource(new CustomSouce());
        //创建jedis连接配置
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("master")
                .setTimeout(10000)
                .setPort(6379)
                .build();
        //写到redis
        streamSource.addSink(new RedisSink<>(config, new MyRedisMapper()));
        env.execute();
    }
    public static class MyRedisMapper implements RedisMapper<UserEvent>{
        @Override
        public RedisCommandDescription getCommandDescription() {
            //写入方式为hset
            return new RedisCommandDescription(RedisCommand.HSET, "events"); //additionalKey参数标识存储再哪里
        }
        @Override
        public String getKeyFromData(UserEvent userEvent) {
            return userEvent.getUserName();
        }
        @Override
        public String getValueFromData(UserEvent userEvent) {
            return userEvent.getUrl();
        }
    }
}

自定义source

import com.event.UserEvent;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Calendar;
import java.util.Random;
public class CustomSouce implements SourceFunction<UserEvent> {
    // 声明一个布尔变量,作为控制数据生成的标识位
    private Boolean running = true;
    @Override
    public void run(SourceContext<UserEvent> ctx) throws Exception {
        Random random = new Random(); // 在指定的数据集中随机选取数据
        String[] users = {"Mary", "Alice", "Bob", "Cary"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=1",
                "./prod?id=2"};
        while (running) {
            ctx.collect(new UserEvent(
                    users[random.nextInt(users.length)],
                    urls[random.nextInt(urls.length)],
                    Calendar .getInstance().getTimeInMillis()
            ));
            // 隔 1 秒生成一个点击事件,方便观测
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        running = false;
    }
}

运行结果

因为上述source是一个无界流,所以数据一直会变化

1.png


MysqlSink(JdbcSink)

将数据输出到mysql

表结构

create table events(
    user_name varchar(20) not null,
    url varchar(100) not null
);

示例代码

import com.event.UserEvent;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
public class MysqlSinkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //一组数据
        DataStreamSource<UserEvent> userEventDataStreamSource =
                env.fromCollection(
                        Arrays.asList(
                                new UserEvent("zhangsan", "/path?test123", System.currentTimeMillis() - 2000L),
                                new UserEvent("zhangsan", "/path?test", System.currentTimeMillis() + 2000L),
                                new UserEvent("lisi", "/path?checkParam", System.currentTimeMillis()),
                                new UserEvent("bob", "/path?test", System.currentTimeMillis() + 2000L),
                                new UserEvent("mary", "/path?checkParam", System.currentTimeMillis()),
                                new UserEvent("lisi", "/path?checkParam123", System.currentTimeMillis() - 2000L)
                        ));
       userEventDataStreamSource.addSink(JdbcSink.sink(
            //要执行的sql语句
               "INSERT INTO events (user_name, url) VALUES(?, ?)",
               new JdbcStatementBuilder<UserEvent>() {
                   @Override
                   public void accept(PreparedStatement preparedStatement, UserEvent userEvent) throws SQLException {
                      //sql占位符赋值
                        preparedStatement.setString(1, userEvent.getUserName());
                        preparedStatement.setString(2, userEvent.getUrl());
                   }
               },
               //jdbc相关参数配置
               new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                       .withUrl("jdbc:mysql://hadoop01:3306/mysql")
                       .withUsername("root")
                       .withPassword("123456")
                       .withDriverName("com.mysql.jdbc.Driver")
                       .build()
       ));
        env.execute();
    }
}

当程序运行结束之后可以看到mysql的events表里面多了数据

1.png


FileSink


将数据输出到文件中(可以输出分区文件)

import com.event.UserEvent;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.TimeUtils;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class FileSinkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<UserEvent> userEventDataStreamSource =
                env.fromCollection(
                        Arrays.asList(
                                new UserEvent("zhangsan", "path?test123", System.currentTimeMillis() - 2000L),
                                new UserEvent("zhangsan", "path?test", System.currentTimeMillis() + 2000L),
                                new UserEvent("lisi", "path?checkParam", System.currentTimeMillis()),
                                new UserEvent("bob", "path?test", System.currentTimeMillis() + 2000L),
                                new UserEvent("mary", "path?checkParam", System.currentTimeMillis()),
                                new UserEvent("lisi", "path?checkParam123", System.currentTimeMillis() - 2000L)
                        ));
        StreamingFileSink<String> streamingFileSink = StreamingFileSink.
                <String>forRowFormat(new Path("./output/"), new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                //不活跃的间隔时间,用于归档保存使用
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .build()
                ).build();
        userEventDataStreamSource.map(data -> data.getUserName()).addSink(streamingFileSink);
        env.execute();
    }
}

运行结束后会多出来一些文件

1.png

相关文章
|
2月前
|
Oracle 关系型数据库 API
实时计算 Flink版产品使用合集之当sink到elasticsearch时,可以指定es的指定字段吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用合集之当sink到elasticsearch时,可以指定es的指定字段吗
|
1天前
|
Docker 容器
docker desktop安装es并连接elasticsearch-head:5
以上就是在Docker Desktop上安装Elasticsearch并连接Elasticsearch-head:5的步骤。
11 2
|
3天前
|
存储 分布式计算 网络协议
【Elasticsearch】elasticsearch.yml配置文件解读,ES配置详解
【Elasticsearch】elasticsearch.yml配置文件解读,ES配置详解
11 0
|
29天前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何确保多并发sink同时更新Redis值时,数据能按事件时间有序地更新并且保持一致性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
JSON 搜索推荐 大数据
Elasticsearch:从 ES|QL 到 PHP 对象
【6月更文挑战第9天】Elasticsearch 是一款强大的开源搜索引擎,适用于大数据处理和分析。在 PHP 开发中,使用 ES|QL 构建复杂查询后,通常需将查询结果转换为 PHP 对象。通过 `json_decode()` 函数解析 JSON 数据,可以实现这一目标。示例代码展示了如何将 Elasticsearch 响应转换为 PHP 对象并遍历数据。这样,我们可以进一步处理和操作数据,适应不同项目需求。随着技术和方法的更新,不断学习和适应将提升我们在开发中的效率和创新力。
48 10
|
1月前
|
消息中间件 SQL 数据处理
实时计算 Flink版产品使用问题之sink多个并行度写入rabbit mq会导致顺序性问题吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
13天前
|
存储 关系型数据库 MySQL
【Elasticsearch】在es中实现mysql中的FIND_IN_SET查询条件
【Elasticsearch】在es中实现mysql中的FIND_IN_SET查询条件
20 0
|
29天前
|
SQL NoSQL 关系型数据库
实时计算 Flink版产品使用问题之需要在sink端配置Doris集群,该如何编写
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 数据处理 API
实时计算 Flink版产品使用问题之holo的io以及cpu使用较为稳定,sink端busy一直在20%左右,有时候50%,该如何优化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
关系型数据库 MySQL Java
实时计算 Flink版产品使用合集之是否可以全量两个es集群吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。