Storm与MySQL的集成

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 笔记

基于Maven构建环境

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-hbase</artifactId>
    <version>1.2.3</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-jdbc</artifactId>
    <version>1.2.3</version>
</dependency>

Storm集成MySQL程序开发

package com.kfk.stormMysql;
import com.google.common.collect.Lists;
import com.kfk.stormKafka.TridentKafkaConsumerTopology;
import com.kfk.stormKafka.TridentKafkaWordCount;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import org.apache.storm.jdbc.trident.state.JdbcQuery;
import org.apache.storm.jdbc.trident.state.JdbcState;
import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
import org.apache.storm.shade.com.google.common.collect.Maps;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.CombinerAggregator;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import java.util.List;
import java.util.Map;
public class StormMySQLToplogy {
    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setMaxSpoutPending(5);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("wordCounter", conf, getTopology());
    }
    public static  StormTopology getTopology() {
        TridentTopology topology = new TridentTopology();
        Map hikariConfigMap = Maps.newHashMap();
        hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
        hikariConfigMap.put("dataSource.url", "jdbc:mysql://bigdata-pro-m01/storm");
        hikariConfigMap.put("dataSource.user","root");
        hikariConfigMap.put("dataSource.password","199911");
        ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
        List<Column> columnSchema = Lists.newArrayList(
                new Column("order_date", java.sql.Types.VARCHAR),
                new Column("order_amt", java.sql.Types.VARCHAR));
        JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
        JdbcState.Options options = new JdbcState.Options()
                .withConnectionProvider(connectionProvider)
                .withMapper(simpleJdbcMapper)
                .withTableName("stormMysql_test");
        JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
        String zkUrl = "bigdata-pro-m01:2181,bigdata-pro-m02:2181,bigdata-pro-m03:2181";
        TransactionalTridentKafkaSpout kafkaSpout =  new TransactionalTridentKafkaSpout(TridentKafkaWordCount.newTridentKafkaConfig(zkUrl));
        TridentState  tridentState = topology.newStream("userSpout", kafkaSpout)
                .each(new Fields("str"),new TridentKafkaConsumerTopology.MyFunction(),new Fields("order_date","order_amt"))
                .groupBy(new Fields("order_date"))
        .persistentAggregate(jdbcStateFactory, new Fields("order_date","order_amt"),  new MySum(), new Fields("_order_amt"));
        topology.newDRPCStream("str").stateQuery(tridentState,new JdbcQuery(),new Fields(""));
        return topology.build();
    }
    public static class MySum implements CombinerAggregator {
        @Override
        public Object init(TridentTuple tuple) {
            long _amt = Long.parseLong(tuple.getStringByField("order_amt"));
            return _amt;
        }
        @Override
        public Object combine(Object val1, Object val2) {
            return (long)val1 + (long)val2;
        }
        @Override
        public Object zero() {
            return 0L;
        }
    }
}


相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
打赏
0
0
0
0
12
分享
相关文章
无缝集成 MySQL,解锁秒级数据分析性能极限
在数据驱动决策的时代,一款性能卓越的数据分析引擎不仅能提供高效的数据支撑,同时也解决了传统 OLTP 在数据分析时面临的查询性能瓶颈、数据不一致等挑战。本文将介绍通过 AnalyticDB MySQL + DTS 来解决 MySQL 的数据分析性能问题。
PHP与MySQL的无缝集成:构建动态网站的艺术####
本文将深入探讨PHP与MySQL如何携手合作,为开发者提供一套强大的工具集,以构建高效、动态且用户友好的网站。不同于传统的摘要概述,本文将以一个生动的案例引入,逐步揭示两者结合的魅力所在,最终展示如何通过简单几步实现数据驱动的Web应用开发。 ####
zabbix agent集成percona监控MySQL的插件实战案例
这篇文章是关于如何使用Percona监控插件集成Zabbix agent来监控MySQL的实战案例。
114 2
zabbix agent集成percona监控MySQL的插件实战案例
SpringBoot 集成 SpringSecurity + MySQL + JWT 附源码,废话不多直接盘
SpringBoot 集成 SpringSecurity + MySQL + JWT 附源码,废话不多直接盘
198 2
SpringBoot 集成 Quartz + MySQL
SpringBoot 集成 Quartz + MySQL
143 1
如何实现Springboot+camunda+mysql的集成
【7月更文挑战第2天】集成Spring Boot、Camunda和MySQL的简要步骤: 1. 初始化Spring Boot项目,添加Camunda和MySQL驱动依赖。 2. 配置`application.properties`,包括数据库URL、用户名和密码。 3. 设置Camunda引擎属性,指定数据源。 4. 引入流程定义文件(如`.bpmn`)。 5. 创建服务处理流程操作,创建控制器接收请求。 6. Camunda自动在数据库创建表结构。 7. 启动应用,测试流程启动,如通过服务和控制器开始流程实例。 示例代码包括服务类启动流程实例及控制器接口。实际集成需按业务需求调整。
477 4
支付系统----微信支付19---集成MyBatis-plus,数据库驱动对应的依赖版本设置问题,5没版本没有cj这个依赖,mysql驱动默认的是版本8,这里是一个父类,数据库都有,写个父类,继承就行
支付系统----微信支付19---集成MyBatis-plus,数据库驱动对应的依赖版本设置问题,5没版本没有cj这个依赖,mysql驱动默认的是版本8,这里是一个父类,数据库都有,写个父类,继承就行
【深入了解MySQL】优化查询性能与数据库设计的深度总结
本文详细介绍了MySQL查询优化和数据库设计技巧,涵盖基础优化、高级技巧及性能监控。
91 0
数据库传奇:MySQL创世之父的两千金My、Maria
《数据库传奇:MySQL创世之父的两千金My、Maria》介绍了MySQL的发展历程及其分支MariaDB。MySQL由Michael Widenius等人于1994年创建,现归Oracle所有,广泛应用于阿里巴巴、腾讯等企业。2009年,Widenius因担心Oracle收购影响MySQL的开源性,创建了MariaDB,提供额外功能和改进。维基百科、Google等已逐步替换为MariaDB,以确保更好的性能和社区支持。掌握MariaDB作为备用方案,对未来发展至关重要。
64 3

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等