package bigdata.itcast.cn.momo.offline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
/**
* @ClassName MomoKafkaToHbase
* @Description TODO 离线场景:消费Kafka的数据写入Hbase
* @Create By Maynor
*/
public class MomoKafkaToHbase {
private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static Connection conn;
private static Table table;
private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名
private static byte[] family = Bytes.toBytes("C1");//列族
//todo:2-构建Hbase连接
//静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能
static{
try {
//构建配置对象
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
//构建连接
conn = ConnectionFactory.createConnection(conf);
//获取表对象
table = conn.getTable(tableName);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
//todo:1-构建消费者,获取数据
consumerKafkaToHbase();
// String momoRowkey = getMomoRowkey("2020-08-13 12:30:00", "13071949728", "17719988692");
// System.out.println(momoRowkey);
}
/**
* 用于消费Kafka的数据,将合法数据写入Hbase
*/
private static void consumerKafkaToHbase() throws Exception {
//构建配置对象
Properties props = new Properties();
//指定服务端地址
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
//指定消费者组的id
props.setProperty("group.id", "momo1");
//关闭自动提交
props.setProperty("enable.auto.commit", "false");
//指定K和V反序列化的类型
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//构建消费者的连接
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//指定订阅哪些Topic
consumer.subscribe(Arrays.asList("MOMO_MSG"));
//持续拉取数据
while (true) {
//向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间
//拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
//todo:3-处理拉取到的数据:打印
//取出每个分区的数据进行处理
Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区
//对每个分区的数据做处理
for (TopicPartition partition : partitions) {
List<ConsumerRecord<String, String>> partRecords = records.records(partition);//取出这个分区的所有数据
//处理这个分区的数据
long offset = 0;
for (ConsumerRecord<String, String> record : partRecords) {
//获取Topic
String topic = record.topic();
//获取分区
int part = record.partition();
//获取offset
offset = record.offset();
//获取Key
String key = record.key();
//获取Value
String value = record.value();
System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value);
//将Value数据写入Hbase
if(value != null && !"".equals(value) && value.split("\001").length == 20 ){
writeToHbase(value);
}
}
//手动提交分区的commit offset
Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1));
consumer.commitSync(offsets);
}
}
}
/**
* 用于实现具体的写入Hbase的方法
* @param value
*/
private static void writeToHbase(String value) throws Exception {
//todo:3-写入Hbase
//切分数据
String[] items = value.split("\001");
String stime = items[0];
String sender_accounter = items[2];
String receiver_accounter = items[11];
//构建rowkey
String rowkey = getMomoRowkey(stime,sender_accounter,receiver_accounter);
//构建Put
Put put = new Put(Bytes.toBytes(rowkey));
//添加列
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));
//执行写入
table.put(put);
}
/**
* 基于消息时间、发送人id、接受人id构建rowkey
* @param stime
* @param sender_accounter
* @param receiver_accounter
* @return
* @throws Exception
*/
private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception {
//转换时间戳
long time = format.parse(stime).getTime();
String suffix = sender_accounter+"_"+receiver_accounter+"_"+time;
//构建MD5
String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);
//合并返回
return prefix+"_"+suffix;
}
}