文章目录
1. 配置flume文件
2.数据采集部分打通
2.1启动zookeeper及集群
2.2启动kafka集群
2.3启动flume集群
2.4生产数据
3 数据消费环境准备
3.1添加maven配置
3.2添加maven配置
4 消费数据工具类
4.1 PropertiesUtil代码来调用配置的参数
4.2 ConnectionInstance实例化一个连接对象
5.kafkaAPI消费数据
5.1本地kafkaAPI接收集群上生产的数据
6.将kafka的数据保存到hbase
6.1HBaseUtil的命名空间
6.2判断表和创建表
6.3写分区键
1. 配置flume文件
首先在kafka的配置文件下创建flume的配置文件
# 1 agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 2 source +0是从第零行开始 a1.sources.r1.type = exec a1.sources.r1.command = tail -F -c +0 /opt/jars/calllog.csv a1.sources.r1.shell = /bin/bash -c # 3 sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.brokerList = hou-01:9092,hou-02:9092,hou-03:9092 a1.sinks.k1.topic = calllog a1.sinks.k1.batchSize = 20 a1.sinks.k1.requiredAcks = 1 # 4 channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 5 bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
2.数据采集部分打通
2.1启动zookeeper及集群
2.1.1启动zookeeper命令
zkServer.sh start #每台机器都要开
2.2启动kafka集群
2.2.1 启动集群
2.2.1 创建主题
2.2.3 启动kafka消费者
启动kafka命令
2.2.1个人路径: 启动kafka集群(前提要启动ZK的集群): /root/soft/kafka/bin/kafka-server-start.sh /root/soft/kafka/config/server.properties & 2.2.2创建主题: /root/soft/kafka/bin/kafka-topics.sh --zookeeper hou-01:2181 --topic calllog --create --replication-factor 1 --partitions 3 删除主题: /root/soft/kafka/bin/kafka-topics.sh --zookeeper bigdata11:2181 --delete --topic calllog 列出所有主题: /root/soft/kafka/bin/kafka-topics.sh --zookeeper hou-01:2181 --list 2.2.3启动kafka消费者: /root/soft/kafka/bin/kafka-console-consumer.sh --zookeeper hou-01:2181 --topic calllog --from-beginning 查看消费者组 bin/kafka-consumer-groups.sh --zookeeper bigdata11:2181 --group console-consumer-30191 --describe bin/flume-ng agent --conf conf --conf-file jobs/kafkaToflume.conf --name agent -Dflume.root.logger=INFO,console
2.3启动flume集群
2.3.1 运行命令
/root/soft/apache-flume-1.6.0-bin/bin/flume-ng agent --conf /root/soft/apache-flume-1.6.0-bin/conf/ --name a1 --conf-file /root/soft/apache-flume-1.6.0-bin/conf/flume-kafka.c
2.4生产数据
2.4.1 运行脚本
sh produceData.sh
sh脚本文件 [root@hou-01 jars]# cat produceData.sh #!/bin/bash java -cp /root/jars/ct_producer-1.0-SNAPSHOT.jar producer.ProductLog /root/jars/calllog.csv
3 数据消费环境准备
写api用habse接收kafka的数据
3.1添加maven配置
查看自己kafka版本,下载对应配置
1d9c07f5adc378b5cde533397a31326a.png
2.10是scala版本 0.8.1.1是kafka版本
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>Telecom</artifactId> <groupId>com.itstar</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>ct_consumer</artifactId> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.1.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.1.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.12.4</version> <configuration> <skipTests>true</skipTests> </configuration> </plugin> </plugins> </build> </project>
3.2添加maven配置
3.2.1
hadoop/etc/hadoop/core-site.xml
hadoop/etc/hadoop/hdfs-site.xml
habse/conf/habase-site.xml
habse/conf/log4j.properties
3.2.2
在windows下,修改主机映射hosts
C:\Windows\System32\drivers\etc\hosts
c75f385d30fcb396a8adbd0ba9b5cf5c.png
3.2.3 新建habse_consumer.properties
内容如下
# 设置kafka的# 设置kafka的brokerlist bootstrap.servers=bigdata11:9092,bigdata12:9092,bigdata13:9092 # 设置消费者所属的消费组 group.id=hbase_consumer_group # 设置是否自动确认offset enable.auto.commit=true # 自动确认offset的时间间隔 auto.commit.interval.ms=30000 # 设置key,value的反序列化类的全名 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer # 以下为自定义属性设置 # 设置本次消费的主题 kafka.topics=calllog # 设置HBase的一些变量 hbase.calllog.regions=6 hbase.calllog.namespace=ns_ct hbase.calllog.tablename=ns_ct:calllog
4 消费数据工具类
4.1 PropertiesUtil代码来调用配置的参数
package utils; import java.io.IOException; import java.io.InputStream; import java.util.Properties; //调用文件里的参数 public class PropertiesUtil { public static Properties properties = null; static { //获取配置文件,方便维护 InputStream is = ClassLoader.getSystemResourceAsStream("hbase_consumer.properties"); properties = new Properties(); try { properties.load(is); } catch (IOException e) { e.printStackTrace(); } } /* 获取参数值使用 @param:key 名字 @return: 参数值 */ public static String getProperty(String key){ return properties.getProperty(key); } }
4.2 ConnectionInstance实例化一个连接对象
package utils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import java.io.IOException; public class ConnectionInstance { private static Connection conn; public static synchronized Connection getConnection(Configuration configuration){ try { if (conn == null || conn.isClosed()){ conn = ConnectionFactory.createConnection(configuration); } } catch (IOException e) { e.printStackTrace(); } return conn; } }
5.kafkaAPI消费数据
5.1本地kafkaAPI接收集群上生产的数据
package kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import utils.PropertiesUtil; import java.util.Arrays; public class HBaseConsumer { public static void main(String[] args) { KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(PropertiesUtil.properties); kafkaConsumer.subscribe(Arrays.asList(PropertiesUtil.getProperty("kafka.topics"))); while(true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> cr : records) { String orivalue = cr.value(); System.out.println(orivalue); } } } }
6.将kafka的数据保存到hbase
6.1HBaseUtil的命名空间
6.2判断表和创建表
6.3写分区键
package utils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.text.DecimalFormat; import java.util.Iterator; import java.util.TreeSet; /** * 1.namespace ====》 命名空间 * 2.createTable ====》 表 * 3.isTable ====》 判断表是否存在 * 4.Regin、RowKey、分区键 */ public class HbaseUtil { /** *初始化命名空间 * @param conf 配置对象 * @param namespace 命名空间的名字 * @throws Exception */ public static void initNameSpace(Configuration conf, String namespace) throws Exception { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); //命名空间描述器 NamespaceDescriptor nd = NamespaceDescriptor .create(namespace) .addConfiguration("AUTHOR", "Yuwen") .build(); //通过admin对象来创建命名空间 admin.createNamespace(nd); //关闭两个对象 close(admin,connection); } //关闭admin对象和connection对象 private static void close(Admin admin, Connection connection) throws IOException { if(admin != null){ admin.close(); } if(connection != null){ connection.close(); } } /** * 创建Hbase的表 * @param conf * @param tableName * @param regions * @param columnFamily */ public static void createTable(Configuration conf, String tableName, int regions, String... columnFamily ) throws IOException { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); //判断表是否存在 if (isExistTable(conf,tableName)){ return; } //表描述器 HTableDescriptor HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); for(String cf : columnFamily){ //列描述器 htd.addFamily(new HColumnDescriptor(cf)); } //创建表 admin.createTable(htd,genSplitKeys(regions)); //关闭对象 close(admin,connection); } /** * 分区键 * * * @param regions region个数 * @return splitkyes */ private static byte[][] genSplitKeys(int regions) { //存放分区键的数组 String[] keys = new String[regions]; //格式化分区键的形式 00|01|02| DecimalFormat df = new DecimalFormat("00"); for (int i =0; i<regions;i++){ keys[i] = df.format(i) + "|"; } byte[][] splitKeys = new byte[regions][]; //排序 保证你这个分区键是有序的 TreeSet<byte[]> treeSet = new TreeSet<>(Bytes.BYTES_COMPARATOR); for (int i =0; i< regions;i++){ treeSet.add(Bytes.toBytes(keys[i])); } //输出 Iterator<byte[]> iterator = treeSet.iterator(); int index = 0; while (iterator.hasNext()){ byte[] next = iterator.next(); splitKeys[index++] = next; } return splitKeys; } /** * 判断表是否存在 * @param conf 配置 conf * @param tableName 表=名 */ public static boolean isExistTable(Configuration conf, String tableName) throws IOException { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); boolean result = admin.tableExists(TableName.valueOf(tableName)); close(admin, connection); return result; } }