flink自定义sink,以入solr、mongodb为例

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
实时计算 Flink 版,5000CU*H 3个月
简介: 自定义sink需要继承 RichSinkFunction,重写open、invoke、close三个方法,open方法主要实现一些公共资源的开启工作,如mongo、solr的连接客户端。invoke会在每条数据进入后调用,主要写一些数据的转化、插入、查询等具体的实际业务。

flink在对很多数据库sink的时候都提供了connector,比如:es、kafka等。
但我们有些场景不仅没有对应的sink,而且有时候还需要在sink的时候还有做一些查询工作。

自定义sink需要继承 RichSinkFunction,重写open、invoke、close三个方法,open方法主要实现一些公共资源的开启工作,如mongo、solr的连接客户端。invoke会在每条数据进入后调用,主要写一些数据的转化、插入、查询等具体的实际业务。

下面我给大家两个实例,一个是sink solr的,一个是sink mongo的。

1、solr示例:

这里涉及到solr的客户端操作,我是使用solrj来封装的。

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import java.util.*;

public class SolrSink extends RichSinkFunction<ExempleBean> {

    private HttpSolrClient solrClient;

    private String username;

    private String password;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        client();

    }

    @Override
    public void invoke(ExempleBean value, Context context) throws Exception {
        String queryString = "id:1";

        try {
            List<ExempleBean> exeples = new QueryRequest(
                        new SolrQuery(queryString).setRows(1)
                ).process(solrClient, "fd_table").getBeans(ExempleBean.class);
            
            for (ExempleBean exem: exeples) {
                //将javaBean转化提交数据
                UpdateRequest request = new UpdateRequest();
                request.setBasicAuthCredentials(username, password);
                request.add(solrClient.getBinder().toSolrInputDocument(exem));
                request.commit(solrClient, "fd_table");
                System.out.println("执行完成");
            }
            

         
        } catch (Exception e) {
            if (null != solrClient) {
                solrClient.close();
                solrClient = null;
                System.out.println("里面关闭啦!");
            }
            client();
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        try {
            if (null != solrClient) {
                solrClient.close();
                solrClient = null;
                System.out.println("关闭啦!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void client() {
        synchronized (this) {
            if (null == solrClient) {

                //加载了配置文件当中solr的username、password
                Map<String, String> keyMap = SolrClientUtil.getSolrUserPassword(
                        "com.medbook.en.solr.username",
                        "com.medbook.en.solr.password");
                username = keyMap.get("username");
                password = keyMap.get("password");
                //加载solr连接的url
                Properties prop = ConfigUtil.getProperties();
                String solrUrl = prop.get("com.medbook.en.solr.url").toString();
                solrClient = new HttpSolrClient.Builder(solrUrl)
                        //连接超过时间没有跑完就会退出
                        .withConnectionTimeout(360000)
                        .withSocketTimeout(360000)
                        .build();
                System.out.println("创建了连接");
            }
        }
    }

}

mongo示例:
这个查询了mongo中的数据,转化长对应的javaBean,更新了传入的javaBean以后,转化成document,存入mongodb。


import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.medbook.Bo.ExampleBean;

import com.medbook.utils.ConfigUtil;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.bson.Document;
import org.bson.conversions.Bson;

import java.util.*;

public class MongoSink extends RichSinkFunction<ExampleBean> {

    private MongoClient mongoClient;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        client();
    }

    @Override
    public void invoke(ExampleBean bean, Context context) throws Exception {
        try {

            MongoDatabase db = mongoClient.getDatabase("example_data");
            MongoCollection<Document> tableColl = db.getCollection("fd_table");
            MongoCollection<Document> relationColl = db.getCollection("fd_relation");
            
            //让gson转化的json字符串的时间按照指定的格式
            Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();

            Bson jourQuery = Filters.eq("id:2");
            Document doc = relationColl.find(jourQuery).first();
            //将查出来mongo中的document转化为javaBean
            RelationBean relationBean = gson.fromJson(doc.toJson(), RelationBean.class);
            //将bean关联加入查出的某个字段
            bean.setName(relationBean.getName());
            
            // 将javabean转化成mongo的document,先使用gson转化成Json,然后解析成Document
            String json = gson.toJson(bean);
            Document document = Document.parse(json);
            
            //将document更新到mongo的fd_table表中
            Bson query = Filters.eq("id", bean.getId());
            tableColl.replaceOne(query, document, new UpdateOptions().upsert(true));
            System.out.println("执行完成!");    

        } catch (Exception e) {
            if (null != mongoClient) {
                mongoClient.close();
                mongoClient = null;
                System.out.println("里面关闭啦!");
            }
            client();
            e.printStackTrace();
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        try {
            if (null != mongoClient) {
                mongoClient.close();
                mongoClient = null;
                System.out.println("关闭啦!");
            }
        } catch (Exception e) {
//            e.printStackTrace();
        }
    }

    public void client() {
        synchronized (this) {
            if (null == mongoClient) {
                Properties prop = ConfigUtil.getProperties();
                //获取配置文件中的mongo的URI
                MongoClientURI mongoClientURI = new MongoClientURI(prop.getProperty("com.medbook.mongo.foreign.uri"));
                mongoClient = new MongoClient(mongoClientURI);
            }
        }
    }
}
相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。 &nbsp; 相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
目录
相关文章
|
2月前
|
消息中间件 分布式计算 大数据
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
54 0
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
203 0
|
2月前
|
分布式计算 监控 大数据
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
61 0
|
2月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
200 0
|
4月前
|
SQL 关系型数据库 测试技术
实时数仓 Hologres操作报错合集之执行Flink的sink操作时出现报错,是什么原因
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
4月前
|
存储 SQL Java
实时数仓 Hologres产品使用合集之如何使用Flink的sink连接
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
5月前
|
消息中间件 NoSQL Redis
实时计算 Flink版产品使用问题之配置了最大连续失败数不为1,在Kafka的精准一次sink中,如果ck失败了,这批数据是否会丢失
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在自定义RichSinkFunction中,如何获取source的schema
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何确保多并发sink同时更新Redis值时,数据能按事件时间有序地更新并且保持一致性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
消息中间件 SQL 数据处理
实时计算 Flink版产品使用问题之sink多个并行度写入rabbit mq会导致顺序性问题吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。