1.项目介绍
本项目采用的数据为通话记录数据,例(张三 李四 2021-4-23 12:32:13 2942)意思是张三在2021-4-23 12:32:13这个时间给李四通话,通话时长为2942秒
- 数据来源【程序自己模拟数据的产生,交给Kafka的生产者】
- Kafka的消费者端接的是HBase数据库
- MapReduce读取HBase中的数据进行分析
- 再将分析的数据导入MySQL
2.各类介绍
Produce模块
- DataProduce:主要负责生产数据
- Main:函数的入口
- testAPI:进行功能测试
- KafkaUtils:将数据发送到topic
Consumer模块
- Main:程序的入口
- HBaseConsumer:消费者拉取数据
- HBaseDao:HBase的客户端对象,创建表导入数据
- HBaseUtils:主要是创建RowKey,还有一些建表和命名空间的操作
Analysis模块
- HashUtils:将每个Cell中的数据存入到HashMap中
- MysqlUtils:主要是Mysql的连接操作
- CountMap:计算每个用户之间的通话记录次数
- DBWrite:实现了Writable、DBWritable,用于序列化以及写数据库操作
3.项目各模块
项目分为三个模块,分别是produce、consumer、analysis
- produce:实现数据生产
- consumer:Kafka将数据写入HBase
- analysis:利用MapReduce分析数据将结果导入Mysql
2.1 produce
2.1.1 entry
public class Main { public static void main(String[] args) throws ParseException, InterruptedException { //生产数据,发到Kafka KafkaUtils.writeDataIntoKafka(); } }
2.1.2 dataProduce
public String produce(String startTime, String endTime) throws ParseException { // 张三 李四 2021-2-3 13:43:25 1204 initMetaData(); //获得随机下标来获得拨打电话者 int callerIndex = (int) (Math.random() * telePhone.size()); String callerName = phoneToName.get(telePhone.get(callerIndex)); //获得被拔打电话者 int calleeIndex; do { calleeIndex = (int) (Math.random() * telePhone.size()); } while (callerIndex == calleeIndex); String calleeName = phoneToName.get(telePhone.get(calleeIndex)); //定义解析时间的对象 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); //定义起止时间 Date startDate = null; Date endDate = null; //解析传入的时间字符串,将其转化成Date格式 startDate = sdf.parse(startTime); endDate = sdf.parse(endTime); //获得一个时间戳,来初始打电话的时间 long randomTs = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random()); Date resultDate = new Date(randomTs); //将初始化好的Date时间,转化成字符串 String resultTimeString = sdf.format(resultDate); //随机初始化小时、分钟、秒 int hour = (int) (Math.random() * 24); int minute = (int) (Math.random() * 60); int second = (int) (Math.random() * 60); //初始化具体时间,精确到小时、分钟、秒 String specificTime = String.format(String.format("%02d", hour) + ":" + String.format("%02d", minute) + ":" + String.format("%02d", second)); //定义时间跨度,表明电话的拨打时长 int duration = (int) (Math.random() * 3600); //拼接结果 张三 李四 2021-2-3 13:43:25 1204 String result = callerName + " " + calleeName + " " + resultTimeString + " " + specificTime + " " + duration; return result; }
2.1.3 kafkaUtils
public static void writeDataIntoKafka() throws ParseException, InterruptedException { //定义配置对象 Properties properties = new Properties(); //定义主机名 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); //字符串序列化的类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //Kafka的主题 String topic = "telecom"; //定义一个生产者对象 KafkaProducer producer = new KafkaProducer<String, String>(properties); //循环发送数据到Kafka for (int i = 0; i < 1000; i++) { //按给定起止时间生成数据 String value = dataProduce.produce("2021-1-1", "2021-5-1"); //睡眠1秒 Thread.sleep(1000); //创建ProducerRecord对象 ProducerRecord<String, String> record = new ProducerRecord<>(topic, value); //发送数据 producer.send(record); } //关闭资源 producer.close(); }
2.2 consumer
2.2.1 entry
public class Main { public static void main(String[] args) throws IOException, InterruptedException, ParseException { //创建HBase消费者 HBaseConsumer hBaseConsumer = new HBaseConsumer(); //从Kafka中获取数据输到HBase hBaseConsumer.getDataFromKafka(); } }
2.2.2 hbase
2.2.2.1 HBaseConsumer
public class HBaseConsumer { public void getDataFromKafka() throws InterruptedException, IOException, ParseException { //定义配置对象 Properties properties = new Properties(); //连接主机名 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); //是否自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); //自动提交的时间间隔 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); //消费者组名 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test3"); //字符串序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //创建消费者对象 KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties); //消费者订阅主题 consumer.subscribe(Arrays.asList("telecom")); //创建一个Dao对象,用于上传数据到HBase HBaseDao hBaseDao = new HBaseDao(); //从Kafka拉取数据 while (true) { //拉取的时间间隔 ConsumerRecords<String,String> records = consumer.poll(100); //拉取数据输到HBase for (ConsumerRecord<String,String> record : records) { String value = record.value(); System.out.println(value); Thread.sleep(1000); //上传数据 hBaseDao.put(value); } } } }
2.2.2.2 HBaseDao
public class HBaseDao { //命名空间 private String nameSpace; //表名 private String tableName; //配置对象 public static Configuration conf; //表对象 private Table table; //连接HBase对象 private Connection connection; //解析日期对象 private SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd"); private SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMddHHmmss"); //初始化配置对象 static { conf = HBaseConfiguration.create(); } public HBaseDao() throws IOException { nameSpace = "telecom"; tableName = "teleRecord"; connection = ConnectionFactory.createConnection(conf); if (!HBaseUtils.isExistTable(conf, tableName)) { HBaseUtils.initNamespace(conf, nameSpace); HBaseUtils.createTable(conf, tableName, "f1", "f2"); } table = connection.getTable(TableName.valueOf(tableName)); } //将数据导入HBase public void put(String value) throws ParseException, IOException { //将Kafka拉取的数据切分 String[] splitValues = value.split(" "); String caller = splitValues[0]; String callee = splitValues[1]; String buildTime = splitValues[2]; String specificTime = splitValues[3]; String duration = splitValues[4]; //2021-03-21 12:23:04 buildTime = buildTime + " " + specificTime; //20210321122304 用于创建rowKey String buildTimeReplace = sdf2.format(sdf1.parse(buildTime)); //时间戳 String buildTimeTs = String.valueOf(sdf1.parse(buildTime).getTime()); //获得rowKey String rowKey = HBaseUtils.createRowKey(caller, callee, buildTimeReplace, "1", duration); //创建put对象 Put put = new Put(Bytes.toBytes(rowKey)); //添加各列属性 put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("caller"), Bytes.toBytes(caller)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callee"), Bytes.toBytes(callee)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time"), Bytes.toBytes(buildTime)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time_ts"), Bytes.toBytes(buildTimeTs)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("flag"), Bytes.toBytes("1")); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("duration"), Bytes.toBytes(duration)); //添加put table.put(put); } }
2.2.3 hbaseUtils
public class HBaseUtils { //判断表是否存在 public static boolean isExistTable(Configuration conf, String tableName) throws IOException { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); boolean result = admin.tableExists(TableName.valueOf(tableName)); admin.close(); connection.close(); return result; } //判断命名空间是否存在 public static boolean isExistTableSpace(Configuration conf, String nameSpace) throws IOException { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); boolean result = false; admin.close(); connection.close(); return result; } //创建命名空间 public static void initNamespace(Configuration conf, String nameSpace) throws IOException { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); try { NamespaceDescriptor descriptor = NamespaceDescriptor.create(nameSpace).build(); admin.createNamespace(descriptor); } catch (NamespaceExistException e) { } finally { admin.close(); connection.close(); } } //创建表 public static void createTable(Configuration conf, String tableName, String... cfs) throws IOException { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); for (String cf : cfs) { hTableDescriptor.addFamily(new HColumnDescriptor(cf)); } admin.createTable(hTableDescriptor); admin.close(); connection.close(); } //创建RowKey public static String createRowKey(String caller, String callee, String buildTime, String flag, String duration) { StringBuilder rowKey = new StringBuilder(); rowKey.append(caller + "_") .append(buildTime + "_") .append(callee + "_") .append(flag + "_") .append(duration); return rowKey.toString(); } }
2.3 analysis
2.3.1 hashUtils
public class HashUtils { public static void putValue(Cell cell, HashMap<String, String> hashMap) { //获取cell中的列名 String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)); //获取每列的值 String value = Bytes.toString(CellUtil.cloneValue(cell)); //存入map hashMap.put(qualifier, value); } }
2.3.2 mysqlUtils
public class MysqlUtils { public static Connection connection; public static String userName = "root"; public static String passwd = "123456"; public static PreparedStatement ps = null; //获得Connection对象 static { try { Class.forName("com.mysql.cj.jdbc.Driver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } try { connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb" + "?useSSL=false" + "&allowPublicKeyRetrieval=true" + "&serverTimezone=UTC", userName, passwd); } catch (SQLException e) { e.printStackTrace(); } } //清空表数据 public static void deleteData(String tableName) throws SQLException { String sql = String.format("delete from %s", tableName); ps = connection.prepareStatement(sql); ps.executeUpdate(); } }
2.3.3 hbaseToMR
2.3.3.1 callCount
2.3.3.1.1 Map
public class CountMap extends TableMapper<Text, IntWritable> { //输出 张三 1 @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { HashMap<String, String> hashMap = new HashMap<>(); for (Cell cell : value.rawCells()) { HashUtils.putValue(cell, hashMap); } String caller = hashMap.get("caller"); String callee = hashMap.get("callee"); context.write(new Text(caller + "-" + callee), new IntWritable(1)); } }
2.3.3.1.2 Reduce
public class CountReduce extends Reducer<Text, IntWritable, DBWrite, NullWritable> { //输出 张三 23 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count += 1; } context.write(new DBWrite(key.toString(), count), NullWritable.get()); } }
2.3.3.1.3 Driver
public class CountDriver implements Tool { //配置对象 public static Configuration conf = null; //Mysql数据库表名 public static String mysqlTableName = "callcounts"; //Mysql表中列名 public static String[] fieldNames = {"callercallee", "counts"}; //Mysql驱动类 public static String driverClass = "com.mysql.cj.jdbc.Driver"; //Mysql的URL public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" + "?useSSL=false" + "&allowPublicKeyRetrieval=true" + "&serverTimezone=UTC"; //Mysql的用户名 public static String userName = "root"; //Mysql的用户密码 public static String passwd = "123456"; @Override public int run(String[] strings) throws Exception { //配置Mysql DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd); //清空表 MysqlUtils.deleteData(mysqlTableName); //获得job对象 Job job = Job.getInstance(conf); //关联Jar job.setJarByClass(CountDriver.class); //配置MapperJob TableMapReduceUtil.initTableMapperJob("teleRecord", new Scan(), CountMap.class, Text.class, IntWritable.class, job); //关联Reduce类 job.setReducerClass(CountReduce.class); job.setOutputKeyClass(DBWrite.class); job.setOutputValueClass(NullWritable.class); //设置输出类型 job.setOutputFormatClass(DBOutputFormat.class); DBOutputFormat.setOutput(job, mysqlTableName, fieldNames); //提交job任务 boolean result = job.waitForCompletion(true); return result ? 0 : 1; } @Override public void setConf(Configuration configuration) { conf = configuration; conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104"); } @Override public Configuration getConf() { return conf; } public static void main(String[] args) { Configuration conf = new Configuration(); try { int run = ToolRunner.run(conf, new CountDriver(), args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }
2.3.3.1.4 DBWrite
public class DBWrite implements Writable, DBWritable { String caller_callee = ""; int count = 0; public DBWrite(){} public DBWrite(String caller_callee, int count){ this.caller_callee=caller_callee; this.count=count; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(caller_callee); out.writeInt(count); } @Override public void readFields(DataInput in) throws IOException { this.caller_callee = in.readUTF(); this.count = in.readInt(); } @Override public void write(PreparedStatement preparedStatement) throws SQLException { preparedStatement.setString(1, caller_callee); preparedStatement.setInt(2, count); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.caller_callee = resultSet.getString(1); this.count = resultSet.getInt(2); } }
2.3.3.2 callerDuration
2.3.3.2.1 Map
public class DurationMap extends TableMapper<Text, LongWritable> { //输出 张三 2041 @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //创建HashMap对象,为了下面取出对应值用 HashMap<String, String> hashMap = new HashMap<>(); //迭代rowkey对应的每个单元 for (Cell cell : value.rawCells()) { HashUtils.putValue(cell, hashMap); } //获得电话发起人 String caller = hashMap.get("caller"); //获得每次电话时长 String duration = hashMap.get("duration"); //写出 context.write(new Text(caller), new LongWritable(Long.valueOf(duration))); } }
2.3.3.2.2 Reduce
public class DurationReduce extends Reducer<Text, LongWritable, DBWrite, NullWritable> { //输出 张三 4204 @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { //存储每个人拨打电话的总时长 long sum = 0; //迭代每个时长 for (LongWritable value : values) { sum += value.get(); } //将结果写出 context.write(new DBWrite(key.toString(), String.valueOf(sum)), NullWritable.get()); } }
2.3.3.2.3 Driver
public class DurationDriver implements Tool { //配置对象 public static Configuration conf = null; //Mysql数据库表名 public static String mysqlTableName = "callerdurations"; //Mysql表中列名 public static String[] fieldNames = {"caller", "durations"}; //Mysql驱动类 public static String driverClass = "com.mysql.cj.jdbc.Driver"; //Mysql的URL public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" + "?useSSL=false" + "&allowPublicKeyRetrieval=true" + "&serverTimezone=UTC"; //Mysql的用户名 public static String userName = "root"; //Mysql的用户密码 public static String passwd = "123456"; @Override public int run(String[] strings) throws Exception { //配置Mysql DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd); //清空表 MysqlUtils.deleteData(mysqlTableName); //获得job对象 Job job = Job.getInstance(conf); //关联Jar job.setJarByClass(DurationDriver.class); //配置MapperJob TableMapReduceUtil.initTableMapperJob("teleRecord", new Scan(), DurationMap.class, Text.class, LongWritable.class, job); //关联Reduce类 job.setReducerClass(DurationReduce.class); job.setOutputKeyClass(DBWrite.class); job.setOutputValueClass(NullWritable.class); //设置输出类型 job.setOutputFormatClass(DBOutputFormat.class); DBOutputFormat.setOutput(job, mysqlTableName, fieldNames); //提交job任务 boolean result = job.waitForCompletion(true); return result ? 0 : 1; } @Override public void setConf(Configuration configuration) { conf = configuration; conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104"); } @Override public Configuration getConf() { return conf; } public static void main(String[] args) { Configuration conf = new Configuration(); try { int run = ToolRunner.run(conf, new DurationDriver(), args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }
2.3.3.3 dayCountDuration
2.3.3.3.1 Map
public class dayCountDurationMap extends TableMapper<Text, LongWritable> { //2021-01-13 3042 @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { HashMap<String, String> hashmap = new HashMap<>(); for (Cell cell : value.rawCells()) { HashUtils.putValue(cell, hashmap); } String date = hashmap.get("build_time").substring(0, 10); String duration = hashmap.get("duration"); context.write(new Text(date), new LongWritable(Long.valueOf(duration))); } }
2.3.3.3.2 Reduce
public class dayCountDurationReduce extends Reducer<Text, LongWritable, DBWrite, NullWritable> { //输出 2021-01-13 2042 @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long durations = 0; for (LongWritable value : values) { durations += value.get(); } context.write(new DBWrite(key.toString(), durations), NullWritable.get()); } }
2.3.3.3.3 Driver
public class dayCountDurationDriver implements Tool { //配置对象 public static Configuration conf = null; //Mysql数据库表名 public static String mysqlTableName = "daydurations"; //Mysql表中列名 public static String[] fieldNames = {"date", "durations"}; //Mysql驱动类 public static String driverClass = "com.mysql.cj.jdbc.Driver"; //Mysql的URL public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" + "?useSSL=false" + "&allowPublicKeyRetrieval=true" + "&serverTimezone=UTC"; //Mysql的用户名 public static String userName = "root"; //Mysql的用户密码 public static String passwd = "123456"; @Override public int run(String[] strings) throws Exception { //配置Mysql DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd); //清空表 MysqlUtils.deleteData(mysqlTableName); //获得job对象 Job job = Job.getInstance(conf); //关联Jar job.setJarByClass(dayCountDurationDriver.class); //配置MapperJob TableMapReduceUtil.initTableMapperJob("teleRecord", new Scan(), dayCountDurationMap.class, Text.class, LongWritable.class, job); //关联Reduce类 job.setReducerClass(dayCountDurationReduce.class); job.setOutputKeyClass(DBWrite.class); job.setOutputValueClass(NullWritable.class); //设置输出类型 job.setOutputFormatClass(DBOutputFormat.class); DBOutputFormat.setOutput(job, mysqlTableName, fieldNames); //提交job任务 boolean result = job.waitForCompletion(true); return result ? 0 : 1; } @Override public void setConf(Configuration configuration) { conf = configuration; conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104"); } @Override public Configuration getConf() { return conf; } public static void main(String[] args) { Configuration conf = new Configuration(); try { int run = ToolRunner.run(conf, new dayCountDurationDriver(), args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }