Storm与MySQL的集成

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 笔记

基于Maven构建环境

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-hbase</artifactId>
    <version>1.2.3</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-jdbc</artifactId>
    <version>1.2.3</version>
</dependency>

Storm集成MySQL程序开发

package com.kfk.stormMysql;
import com.google.common.collect.Lists;
import com.kfk.stormKafka.TridentKafkaConsumerTopology;
import com.kfk.stormKafka.TridentKafkaWordCount;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import org.apache.storm.jdbc.trident.state.JdbcQuery;
import org.apache.storm.jdbc.trident.state.JdbcState;
import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
import org.apache.storm.shade.com.google.common.collect.Maps;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.CombinerAggregator;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import java.util.List;
import java.util.Map;
public class StormMySQLToplogy {
    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setMaxSpoutPending(5);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("wordCounter", conf, getTopology());
    }
    public static  StormTopology getTopology() {
        TridentTopology topology = new TridentTopology();
        Map hikariConfigMap = Maps.newHashMap();
        hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
        hikariConfigMap.put("dataSource.url", "jdbc:mysql://bigdata-pro-m01/storm");
        hikariConfigMap.put("dataSource.user","root");
        hikariConfigMap.put("dataSource.password","199911");
        ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
        List<Column> columnSchema = Lists.newArrayList(
                new Column("order_date", java.sql.Types.VARCHAR),
                new Column("order_amt", java.sql.Types.VARCHAR));
        JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
        JdbcState.Options options = new JdbcState.Options()
                .withConnectionProvider(connectionProvider)
                .withMapper(simpleJdbcMapper)
                .withTableName("stormMysql_test");
        JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
        String zkUrl = "bigdata-pro-m01:2181,bigdata-pro-m02:2181,bigdata-pro-m03:2181";
        TransactionalTridentKafkaSpout kafkaSpout =  new TransactionalTridentKafkaSpout(TridentKafkaWordCount.newTridentKafkaConfig(zkUrl));
        TridentState  tridentState = topology.newStream("userSpout", kafkaSpout)
                .each(new Fields("str"),new TridentKafkaConsumerTopology.MyFunction(),new Fields("order_date","order_amt"))
                .groupBy(new Fields("order_date"))
        .persistentAggregate(jdbcStateFactory, new Fields("order_date","order_amt"),  new MySum(), new Fields("_order_amt"));
        topology.newDRPCStream("str").stateQuery(tridentState,new JdbcQuery(),new Fields(""));
        return topology.build();
    }
    public static class MySum implements CombinerAggregator {
        @Override
        public Object init(TridentTuple tuple) {
            long _amt = Long.parseLong(tuple.getStringByField("order_amt"));
            return _amt;
        }
        @Override
        public Object combine(Object val1, Object val2) {
            return (long)val1 + (long)val2;
        }
        @Override
        public Object zero() {
            return 0L;
        }
    }
}


相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
6月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
630 0
|
7月前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
1723 45
|
7月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
578 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
6月前
|
关系型数据库 MySQL 数据库
|
6月前
|
关系型数据库 MySQL OLAP
无缝集成 MySQL,解锁秒级 OLAP 分析性能极限,完成任务可领取三合一数据线!
通过 AnalyticDB MySQL 版、DMS、DTS 和 RDS MySQL 版协同工作,解决大规模业务数据统计难题,参与活动完成任务即可领取三合一数据线(限量200个),还有机会抽取蓝牙音箱大奖!
|
8月前
|
Cloud Native 关系型数据库 MySQL
无缝集成 MySQL,解锁秒级数据分析性能极限
在数据驱动决策的时代,一款性能卓越的数据分析引擎不仅能提供高效的数据支撑,同时也解决了传统 OLTP 在数据分析时面临的查询性能瓶颈、数据不一致等挑战。本文将介绍通过 AnalyticDB MySQL + DTS 来解决 MySQL 的数据分析性能问题。
|
10月前
|
关系型数据库 MySQL PHP
PHP与MySQL的无缝集成:构建动态网站的艺术####
本文将深入探讨PHP与MySQL如何携手合作,为开发者提供一套强大的工具集,以构建高效、动态且用户友好的网站。不同于传统的摘要概述,本文将以一个生动的案例引入,逐步揭示两者结合的魅力所在,最终展示如何通过简单几步实现数据驱动的Web应用开发。 ####
|
11月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
10月前
|
消息中间件 监控 Java
您是否已集成 Spring Boot 与 ActiveMQ?
您是否已集成 Spring Boot 与 ActiveMQ?
307 0
|
监控 druid Java
spring boot 集成配置阿里 Druid监控配置
spring boot 集成配置阿里 Druid监控配置
930 6

热门文章

最新文章

推荐镜像

更多