基于Maven构建环境
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hbase</artifactId> <version>1.2.3</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-jdbc</artifactId> <version>1.2.3</version> </dependency>
Storm集成MySQL程序开发
package com.kfk.stormMysql; import com.google.common.collect.Lists; import com.kfk.stormKafka.TridentKafkaConsumerTopology; import com.kfk.stormKafka.TridentKafkaWordCount; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.common.HikariCPConnectionProvider; import org.apache.storm.jdbc.mapper.JdbcMapper; import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; import org.apache.storm.jdbc.trident.state.JdbcQuery; import org.apache.storm.jdbc.trident.state.JdbcState; import org.apache.storm.jdbc.trident.state.JdbcStateFactory; import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout; import org.apache.storm.shade.com.google.common.collect.Maps; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.operation.CombinerAggregator; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.tuple.Fields; import java.util.List; import java.util.Map; public class StormMySQLToplogy { public static void main(String[] args) throws Exception { Config conf = new Config(); conf.setMaxSpoutPending(5); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wordCounter", conf, getTopology()); } public static StormTopology getTopology() { TridentTopology topology = new TridentTopology(); Map hikariConfigMap = Maps.newHashMap(); hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource"); hikariConfigMap.put("dataSource.url", "jdbc:mysql://bigdata-pro-m01/storm"); hikariConfigMap.put("dataSource.user","root"); hikariConfigMap.put("dataSource.password","199911"); ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap); List<Column> columnSchema = Lists.newArrayList( new Column("order_date", java.sql.Types.VARCHAR), new Column("order_amt", java.sql.Types.VARCHAR)); JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema); JdbcState.Options options = new JdbcState.Options() .withConnectionProvider(connectionProvider) .withMapper(simpleJdbcMapper) .withTableName("stormMysql_test"); JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options); String zkUrl = "bigdata-pro-m01:2181,bigdata-pro-m02:2181,bigdata-pro-m03:2181"; TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(TridentKafkaWordCount.newTridentKafkaConfig(zkUrl)); TridentState tridentState = topology.newStream("userSpout", kafkaSpout) .each(new Fields("str"),new TridentKafkaConsumerTopology.MyFunction(),new Fields("order_date","order_amt")) .groupBy(new Fields("order_date")) .persistentAggregate(jdbcStateFactory, new Fields("order_date","order_amt"), new MySum(), new Fields("_order_amt")); topology.newDRPCStream("str").stateQuery(tridentState,new JdbcQuery(),new Fields("")); return topology.build(); } public static class MySum implements CombinerAggregator { @Override public Object init(TridentTuple tuple) { long _amt = Long.parseLong(tuple.getStringByField("order_amt")); return _amt; } @Override public Object combine(Object val1, Object val2) { return (long)val1 + (long)val2; } @Override public Object zero() { return 0L; } } }