大家有写过rocketmq的sql connector 吗?
发表文章、提出问题、分享经验、结交志同道合的朋友 有两种方式可以实现,手打不易,望采纳!!! 1、可以使用 Apache Flink 实现 参考如下: 1、首先在pom.xml中添加RocketMQ和Flink的依赖:
org.apacherocketmq-flink {rocketmq-version}
org.apache.flink flink-connector-jdbc {flink-version}
2、在Flink中创建一个RocketMQ source,将数据从RocketMQ读入到Flink中,例如:
DataStream stream = env.addSource(new FlinkRocketMQConsumer( rocketMQConfig, new SimpleStringSchema(), RocketMQBuiltinTopic./your topic/,/other config/));
3、在Flink中使用JDBC connector将数据写入到MySQL数据库中,例如:
stream.addSink(JdbcSink.sink( "INSERT INTO /your table/ (/column1/, /column2/) VALUES (?, ?)", new JdbcStatementBuilder() { public void accept(PreparedStatement preparedStatement, String s) throws SQLException { preparedStatement.setString(1, /get column1 from s/); preparedStatement.setString(2, /get column2 from s/); } }, JdbcExecutionOptions.builder() .withBatchSize(/your batch size/) .withBatchIntervalMs(/your batch interval/) .withMaxRetries(/your max retries/) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(/your MySQL url/) .withDriverName(/your MySQL driver name/) .withUsername(/your MySQL username/) .withPassword(/your MySQL password/) .build()));
2、使用 Apache Kafka Connect 实现 1、在Kafka Connect中配置RocketMQ的Connector,例如:
name=rocketmq-sink connector.class=io.openmessaging.connect.runtime.sink.OpenMessagingSinkConnector tasks.max=1 topics=/your topic/ rocketmq.endpoint=/your endpoint/ rocketmq.accessKeyId=/your accessKeyId/ rocketmq.accessKeySecret=/your accessKeySecret/ rocketmq.namespace=/your namespace/ 2、在Kafka Connect中配置JDBC的Connector,将数据从Kafka Connect写入到MySQL数据库中,例如:
name=jdbc-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 topics=/your topic/ connection.url=/your MySQL url/ connection.user=/your MySQL username/ connection.password=/your MySQL password/ auto.create=true insert.mode=upsert table.name.format=/your table name format/
具体实现方式可以根据具体的业务需求进行调整。
2023-02-22发布于浙江 0 1
三掌柜666
十分耕耘,一定会有一分收获! 楼主你好,通过使用fire框架可以很便捷的消费rocketmq内的数据,
val dstream = this.fire.createRocketMqPullStream() 然后通过使用
jthis.fire.sql(""" |CREATE table source ( | id bigint, | name string, | age int, | length double, | data DECIMAL(10, 5) |) WITH | ( | 'connector' = 'fire-rocketmq', | 'format' = 'json', | 'rocket.brokers.name' = 'zms', | 'rocket.topics' = 'fire', | 'rocket.group.id' = 'fire', | 'rocket.consumer.tag' = '*' | ) |""".stripMargin) 且rocketmq sql connector中的with参数复用了api中的配置参数。
2023-02-22发布于江苏 0 1 我到底啦~ + 关注问题 复制 举报 取消
照着kafka connector写一个就行了,一模一样的
此答案来自钉钉群“【2】Apache Flink China 社区”
有两种方式可以实现,手打不易,望采纳!!! 1、可以使用 Apache Flink 实现 参考如下: 1、首先在pom.xml中添加RocketMQ和Flink的依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-flink</artifactId>
<version>{rocketmq-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>{flink-version}</version>
</dependency>
2、在Flink中创建一个RocketMQ source,将数据从RocketMQ读入到Flink中,例如:
DataStream<String> stream = env.addSource(new FlinkRocketMQConsumer<String>(
rocketMQConfig, new SimpleStringSchema(), RocketMQBuiltinTopic./*your topic*/,/*other config*/));
3、在Flink中使用JDBC connector将数据写入到MySQL数据库中,例如:
stream.addSink(JdbcSink.sink(
"INSERT INTO /*your table*/ (/*column1*/, /*column2*/) VALUES (?, ?)",
new JdbcStatementBuilder<String>() {
public void accept(PreparedStatement preparedStatement, String s) throws SQLException {
preparedStatement.setString(1, /*get column1 from s*/);
preparedStatement.setString(2, /*get column2 from s*/);
}
},
JdbcExecutionOptions.builder()
.withBatchSize(/*your batch size*/)
.withBatchIntervalMs(/*your batch interval*/)
.withMaxRetries(/*your max retries*/)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(/*your MySQL url*/)
.withDriverName(/*your MySQL driver name*/)
.withUsername(/*your MySQL username*/)
.withPassword(/*your MySQL password*/)
.build()));
2、使用 Apache Kafka Connect 实现 1、在Kafka Connect中配置RocketMQ的Connector,例如:
name=rocketmq-sink
connector.class=io.openmessaging.connect.runtime.sink.OpenMessagingSinkConnector
tasks.max=1
topics=/*your topic*/
rocketmq.endpoint=/*your endpoint*/
rocketmq.accessKeyId=/*your accessKeyId*/
rocketmq.accessKeySecret=/*your accessKeySecret*/
rocketmq.namespace=/*your namespace*/
2、在Kafka Connect中配置JDBC的Connector,将数据从Kafka Connect写入到MySQL数据库中,例如:
name=jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=/*your topic*/
connection.url=/*your MySQL url*/
connection.user=/*your MySQL username*/
connection.password=/*your MySQL password*/
auto.create=true
insert.mode=upsert
table.name.format=/*your table name format*/
具体实现方式可以根据具体的业务需求进行调整。
楼主你好,通过使用fire框架可以很便捷的消费rocketmq内的数据,
val dstream = this.fire.createRocketMqPullStream()
然后通过使用
jthis.fire.sql("""
|CREATE table source (
| id bigint,
| name string,
| age int,
| length double,
| data DECIMAL(10, 5)
|) WITH
| (
| 'connector' = 'fire-rocketmq',
| 'format' = 'json',
| 'rocket.brokers.name' = 'zms',
| 'rocket.topics' = 'fire',
| 'rocket.group.id' = 'fire',
| 'rocket.consumer.tag' = '*'
| )
|""".stripMargin)
且rocketmq sql connector中的with参数复用了api中的配置参数。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。