开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

大家有写过rocketmq的sql connector 吗?

大家有写过rocketmq的sql connector 吗?

展开
收起
巴拉巴拉巴拉 2023-02-22 09:55:43 953 6
5 条回答
写回答
取消 提交回答
  • 发表文章、提出问题、分享经验、结交志同道合的朋友 有两种方式可以实现,手打不易,望采纳!!! 1、可以使用 Apache Flink 实现 参考如下: 1、首先在pom.xml中添加RocketMQ和Flink的依赖:

        org.apacherocketmq-flink     {rocketmq-version}

        org.apache.flink     flink-connector-jdbc     {flink-version}

    2、在Flink中创建一个RocketMQ source,将数据从RocketMQ读入到Flink中,例如:

    DataStream stream = env.addSource(new FlinkRocketMQConsumer(         rocketMQConfig, new SimpleStringSchema(), RocketMQBuiltinTopic./your topic/,/other config/));

    3、在Flink中使用JDBC connector将数据写入到MySQL数据库中,例如:

    stream.addSink(JdbcSink.sink(     "INSERT INTO /your table/ (/column1/, /column2/) VALUES (?, ?)",     new JdbcStatementBuilder() {         public void accept(PreparedStatement preparedStatement, String s) throws SQLException {             preparedStatement.setString(1, /get column1 from s/);             preparedStatement.setString(2, /get column2 from s/);         }     },     JdbcExecutionOptions.builder()         .withBatchSize(/your batch size/)         .withBatchIntervalMs(/your batch interval/)         .withMaxRetries(/your max retries/)         .build(),     new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()         .withUrl(/your MySQL url/)         .withDriverName(/your MySQL driver name/)         .withUsername(/your MySQL username/)         .withPassword(/your MySQL password/)         .build()));

    2、使用 Apache Kafka Connect 实现 1、在Kafka Connect中配置RocketMQ的Connector,例如:

    name=rocketmq-sink connector.class=io.openmessaging.connect.runtime.sink.OpenMessagingSinkConnector tasks.max=1 topics=/your topic/ rocketmq.endpoint=/your endpoint/ rocketmq.accessKeyId=/your accessKeyId/ rocketmq.accessKeySecret=/your accessKeySecret/ rocketmq.namespace=/your namespace/ 2、在Kafka Connect中配置JDBC的Connector,将数据从Kafka Connect写入到MySQL数据库中,例如:

    name=jdbc-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 topics=/your topic/ connection.url=/your MySQL url/ connection.user=/your MySQL username/ connection.password=/your MySQL password/ auto.create=true insert.mode=upsert table.name.format=/your table name format/

    具体实现方式可以根据具体的业务需求进行调整。

    2023-02-22发布于浙江 0 1

    三掌柜666

    十分耕耘,一定会有一分收获! 楼主你好,通过使用fire框架可以很便捷的消费rocketmq内的数据,

    val dstream = this.fire.createRocketMqPullStream() 然后通过使用

    jthis.fire.sql("""                     |CREATE table source (                     |  id bigint,                     |  name string,                     |  age int,                     |  length double,                     |  data DECIMAL(10, 5)                     |) WITH                     |   (                     |   'connector' = 'fire-rocketmq',                     |   'format' = 'json',                     |   'rocket.brokers.name' = 'zms',                     |   'rocket.topics'       = 'fire',                     |   'rocket.group.id'     = 'fire',                     |   'rocket.consumer.tag' = '*'                     |   )                     |""".stripMargin) 且rocketmq sql connector中的with参数复用了api中的配置参数。

    2023-02-22发布于江苏 0 1 我到底啦~ + 关注问题 复制 举报 取消

    2023-02-24 12:07:50
    赞同 展开评论 打赏
  • 为十年后栽一颗树~

    没有写过,蹲个答案~

    2023-02-23 16:16:53
    赞同 展开评论 打赏
  • 照着kafka connector写一个就行了,一模一样的

    此答案来自钉钉群“【2】Apache Flink China 社区”

    2023-02-22 19:53:56
    赞同 展开评论 打赏
  • 发表文章、提出问题、分享经验、结交志同道合的朋友

    有两种方式可以实现,手打不易,望采纳!!! 1、可以使用 Apache Flink 实现 参考如下: 1、首先在pom.xml中添加RocketMQ和Flink的依赖:

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-flink</artifactId>
        <version>{rocketmq-version}</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc</artifactId>
        <version>{flink-version}</version>
    </dependency>
    
    

    2、在Flink中创建一个RocketMQ source,将数据从RocketMQ读入到Flink中,例如:

    DataStream<String> stream = env.addSource(new FlinkRocketMQConsumer<String>(
            rocketMQConfig, new SimpleStringSchema(), RocketMQBuiltinTopic./*your topic*/,/*other config*/));
    
    

    3、在Flink中使用JDBC connector将数据写入到MySQL数据库中,例如:

    stream.addSink(JdbcSink.sink(
        "INSERT INTO /*your table*/ (/*column1*/, /*column2*/) VALUES (?, ?)",
        new JdbcStatementBuilder<String>() {
            public void accept(PreparedStatement preparedStatement, String s) throws SQLException {
                preparedStatement.setString(1, /*get column1 from s*/);
                preparedStatement.setString(2, /*get column2 from s*/);
            }
        },
        JdbcExecutionOptions.builder()
            .withBatchSize(/*your batch size*/)
            .withBatchIntervalMs(/*your batch interval*/)
            .withMaxRetries(/*your max retries*/)
            .build(),
        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
            .withUrl(/*your MySQL url*/)
            .withDriverName(/*your MySQL driver name*/)
            .withUsername(/*your MySQL username*/)
            .withPassword(/*your MySQL password*/)
            .build()));
    
    

    2、使用 Apache Kafka Connect 实现 1、在Kafka Connect中配置RocketMQ的Connector,例如:

    name=rocketmq-sink
    connector.class=io.openmessaging.connect.runtime.sink.OpenMessagingSinkConnector
    tasks.max=1
    topics=/*your topic*/
    rocketmq.endpoint=/*your endpoint*/
    rocketmq.accessKeyId=/*your accessKeyId*/
    rocketmq.accessKeySecret=/*your accessKeySecret*/
    rocketmq.namespace=/*your namespace*/
    

    2、在Kafka Connect中配置JDBC的Connector,将数据从Kafka Connect写入到MySQL数据库中,例如:

    name=jdbc-sink
    connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
    tasks.max=1
    topics=/*your topic*/
    connection.url=/*your MySQL url*/
    connection.user=/*your MySQL username*/
    connection.password=/*your MySQL password*/
    auto.create=true
    insert.mode=upsert
    table.name.format=/*your table name format*/
    
    

    具体实现方式可以根据具体的业务需求进行调整。

    2023-02-22 11:54:31
    赞同 1 展开评论 打赏
  • 十分耕耘,一定会有一分收获!

    楼主你好,通过使用fire框架可以很便捷的消费rocketmq内的数据,

    val dstream = this.fire.createRocketMqPullStream()
    

    然后通过使用

    jthis.fire.sql("""
                        |CREATE table source (
                        |  id bigint,
                        |  name string,
                        |  age int,
                        |  length double,
                        |  data DECIMAL(10, 5)
                        |) WITH
                        |   (
                        |   'connector' = 'fire-rocketmq',
                        |   'format' = 'json',
                        |   'rocket.brokers.name' = 'zms',
                        |   'rocket.topics'       = 'fire',
                        |   'rocket.group.id'     = 'fire',
                        |   'rocket.consumer.tag' = '*'
                        |   )
                        |""".stripMargin)
    

    且rocketmq sql connector中的with参数复用了api中的配置参数。

    2023-02-22 10:31:14
    赞同 1 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
SQL Server 2017 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载