VMware创建Linux虚拟机之(五)Spark完全分布式部署教程
Hello,转眼间已到2022年底,学期末……总体来说,今年经历了很多,真正的成长了许多,成熟了许多。只能说,希望,明天依旧美好!!! 🐒本篇博客使用到的工具有:VMware16 ,Xftp7若不熟悉操作命令,推荐使用带GUI页面的CentOS7虚拟机我将使用带GUI页面的虚拟机演示虚拟机(Virtual Machine)指通过软件模拟的具有完整硬件系统功能的、运行在一个完全隔离环境中的完整计算机系统。在实体计算机中能够完成的工作在虚拟机中都能够实现。在计算机中创建虚拟机时,需要将实体机的部分硬盘和内存容量作为虚拟机的硬盘和内存容量。每个虚拟机都有独立的CMOS、硬盘和操作系统,可以像使用实体机一样对虚拟机进行操作。【确保服务器集群安装和配置已经完成!】可参考我的博客:VMware创建Linux虚拟机之(一)实现免密登录_Vim_飞鱼的博客-CSDN博客VMware创建Linux虚拟机之(二)下载安装JDK与配置Java环境变量_Vim_飞鱼的博客-CSDN博客VMware创建Linux虚拟机之(三)Hadoop安装与配置及搭建集群_Vim_飞鱼的博客-CSDN博客_利用vmware虚拟机安装hadoopVMware创建Linux虚拟机之(四)ZooKeeper&HBase完全分布式安装_Vim_飞鱼的博客-CSDN博客_在vmware中hbase的安装和配置前言请根据读者的自身情况,进行相应随机应变。我的三台CentOS7服务器:主机:master(192.168.149.101)从机:slave1(192.168.149.102)从机:slave2(192.168.149.103)每一个节点的安装与配置是相同的,在实际工作中,通常在master节点上完成安装和配置后,然后将安装目录通过 scp 命令复制到其他节点即可。注意:所有操作都是root用户权限,需要我们登陆时选择root用户登录。唯有热爱,可抵岁月漫长,唯有热爱,不畏世间无常!继Mapreduce之后,作为新一代并且是主流的计算引擎,学好Spark是非常重要的,这一篇博客会专门介绍如何部署一个分布式的Spark计算框架,在之后的博客中,会继续讲到Spark的基本模块的介绍以及底层原理,好了,废话不多说,直接开始吧!下载Spark安装包 部署Spark时,我们使用的版本如下所示:Apache Spark™ - Unified Engine for large-scale data analyticshttps://spark.apache.org/ 解压Spark安装包 首先,需要确保 network 网络已经配置好,使用 Xftp 等类似工具进行上传,把 spark-3.1.2-bin-hadoop3.2.tgz 上传到 opt/ 目录内。(也可使用U盘等工具拖拽)上传完成后,在 master 主机上执行以下代码: 解压Sparkcd /opt/
tar -zxvf spark-3.1.2-bin-hadoop3.2.tgz执行成功后,系统在 opt 目录自动创建 spark-3.1.2 子目录。 注意:可使用 ls 等命令查看文件解压是否无误。配置bashrc文件(等同于profile)#Spark
export SPARK_HOME=/opt/spark-3.1.2
export PATH=$SPARK_HOME/bin:$PATH三台虚拟机均进行此操作 修改spark-env.sh文件[root@master conf]# cp spark-env.sh.template spark-env.sh
[root@master conf]# vim spark-env.sh
[root@master conf]# 添加如下内容:export JAVA_HOME=/opt/jdk1.8.0_261
export HADOOP_HOME=/opt/hadoop/hadoop
export SPARK_MASTER_IP=master
export SPARK_MASTER_PORT=7077
export SPARK_DIST_CLASSPATH=$(/opt/hadoop/hadoop/bin/hadoop classpath)
export HADOOP_CONF_DIR=/opt/hadoop/hadoop/etc/hadoop
export SPARK_YARN_USER_ENV="CLASSPATH=/opt/hadoop/hadoop/etc/hadoop"
export YARN_CONF_DIR=/opt/hadoop/hadoop/etc/hadoop如下图所示:将spark拷贝到其他两个节点[root@master conf]# scp -r /opt/spark-3.1.2/ slave1:/opt/
[root@master conf]# scp -r /opt/spark-3.1.2/ slave2:/opt/启动spark集群并查看进程 master:[root@master opt]# /opt/spark-3.1.2/sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/spark-3.1.2/logs/spark-root-org.apache.spark.deploy.master.Master-1-master.out
slave1: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-3.1.2/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slave1.out
slave2: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-3.1.2/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slave2.out
master: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-3.1.2/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-master.out
[root@master opt]# jps
3860 Worker
3677 Master
3886 Jpsslave1:[root@slave1 ~]# jps
1749 Worker
1806 Jps
[root@slave1 ~]# slave2:[root@slave2 ~]# jps
3667 Jps
3590 Worker
[root@slave2 ~]# 启动spark检测是否能正常启动启动local模式:spark-shell --master local[root@master hadoop]# cd /opt/spark-3.1.2/conf/
[root@master conf]# spark-shell --master local
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/spark-3.1.2/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/hadoop/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2022-12-21 22:01:54,437 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://leader:4040
Spark context available as 'sc' (master = local, app id = local-1671631329749).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_261)
Type in expressions to have them evaluated.
Type :help for more information.
scala> 结合实例,可以看到我们的 Spark 组件已经完美部署完成!💪请问,你学废了吗?
VMware创建Linux虚拟机之(四)ZooKeeper&HBase完全分布式安装 下
HBase完全分布式安装 下载HBase安装包Apache HBase – Apache HBase Downloadshttps://hbase.apache.org/downloads.html上传至master虚拟机并解压HBasecd /opt/
tar -zxvf hbase-2.3.3.tar.gz
#修改权限
sudo chmod -R 777 /opt/hbase-2.3.3配置环境变量[root@master ~]# vim /etc/bashrc
#HBase
export HBASE_HOME=/opt/hbase-2.3.3
export PATH=$PATH:$HBASE_HOME/bin
#配置生效
[root@master ~]# source /etc/bashrc三台虚拟机均进行此操作 在这里我们可以使用 hbase -version 查看环境变量是否正确[root@master ~]# hbase -version
java version "1.8.0_261"
Java(TM) SE Runtime Environment (build 1.8.0_261-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.261-b12, mixed mode)
[root@master ~]# 配置hbase相关文件修改及配置 hbase-2.3.3/conf 目录下的 hbase-env.sh 文件[root@master ~]# cd /opt/hbase-3.6.2/conf
[root@master ~]# vim hbase-env.sh
#!/usr/bin/env bash
# Where log files are stored. $HBASE_HOME/logs by default.
# export HBASE_LOG_DIR=${HBASE_HOME}/logs
# Enable remote JDWP debugging of major HBase processes. Meant for Core Developers
# export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8070"
# export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8071"
# export HBASE_THRIFT_OPTS="$HBASE_THRIFT_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8072"
# export HBASE_ZOOKEEPER_OPTS="$HBASE_ZOOKEEPER_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8073"
# export HBASE_REST_OPTS="$HBASE_REST_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8074"
# A string representing this instance of hbase. $USER by default.
# export HBASE_IDENT_STRING=$USER
# The scheduling priority for daemon processes. See 'man nice'.
# export HBASE_NICENESS=10
# The directory where pid files are stored. /tmp by default.
export HBASE_PID_DIR=/opt/hadoop/hadoop/pids
# Seconds to sleep between slave commands. Unset by default. This
# can be useful in large clusters, where, e.g., slave rsyncs can
# otherwise arrive faster than the master can service them.
# export HBASE_SLAVE_SLEEP=0.1
# Tell HBase whether it should manage it's own instance of ZooKeeper or not.
# export HBASE_MANAGES_ZK=true
# The default log rolling policy is RFA, where the log file is rolled as per the size defined for the
# RFA appender. Please refer to the log4j.properties file to see more details on this appender.
# In case one needs to do log rolling on a date change, one should set the environment property
# HBASE_ROOT_LOGGER to "<DESIRED_LOG LEVEL>,DRFA".
# For example:
# HBASE_ROOT_LOGGER=INFO,DRFA
# The reason for changing default to RFA is to avoid the boundary case of filling out disk space as
# DRFA doesn't put any cap on the log size. Please refer to HBase-5655 for more context.
# Tell HBase whether it should include Hadoop's lib when start up,
# the default value is false,means that includes Hadoop's lib.
export HBASE_DISABLE_HADOOP_CLASSPATH_LOOKUP="true"
# Override text processing tools for use by these launch scripts.
# export GREP="${GREP-grep}"
# export SED="${SED-sed}"
export JAVA_HOME=/opt/jdk1.8.0_261
export HBASE_HOME=/opt/hbase-2.3.3
export HBASE_MANAGES_ZK=false其中 HBASE_MANAGES_ZK=false 表示我们使用自己安装 zookeeper 集群而不是 hbase 自带的 zookeeper 集群修改及配置 hbase-2.3.3/conf 目录下的 hbase-site.xml 文件
[root@master ~]# vim hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<property>
<name>hbase.root.dir</name>
<value>hdfs://master:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.tmp.dir</name>
<value>./tmp</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>master,slave1,slave2</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/opt/zookeeper-3.6.2</value>
</property>
<property>
<name>zookeeper.session.timeout</name>
<value>300000</value> <!--默认: 180000 :zookeeper 会话超时时间,单位是毫秒 -->
</property>
<property>
<name>hbase.master.maxclockskew</name>
<value>30000</value>
</property>
</configuration>添加 hbase 集群服务器的 ip 或者 hostname ,vi regionservers1. [root@master ~]# vim regionservers
2.
3. master
4. slave1
5. slave2将以上 Hbase 文件复制到三台服务器上scp -r /opt/hbase-2.3.3 root@slave1:/opt
scp -r /opt/hbase-2.3.3 root@slave2:/opt完成即可 启动hbase配置完成hbase后将上面的所有文件复制到其他两台服务器上,然后使用 start-hbase.sh 命令启动hbase集群[root@master ~]# cd /opt/hbase-2.3.3/bin/
[root@master bin]# ls
considerAsDead.sh hbase-config.cmd master-backup.sh start-hbase.sh
draining_servers.rb hbase-config.sh region_mover.rb stop-hbase.cmd
get-active-master.rb hbase-daemon.sh regionservers.sh stop-hbase.sh
graceful_stop.sh hbase-daemons.sh region_status.rb test
hbase hbase-jruby replication tmp
hbase-cleanup.sh hirb.rb rolling-restart.sh zookeepers.sh
hbase.cmd local-master-backup.sh shutdown_regionserver.rb
hbase-common.sh local-regionservers.sh start-hbase.cmd
[root@master bin]# start-hbase.sh
running master, logging to /opt/hbase-2.3.3/logs/hbase-root-master-master.out
slave1: running regionserver, logging to /opt/hbase-2.3.3/logs/hbase-root-regionserver-slave1.out
slave2: running regionserver, logging to /opt/hbase-2.3.3/logs/hbase-root-regionserver-slave2.out
master: running regionserver, logging to /opt/hbase-2.3.3/logs/hbase-root-regionserver-master.out
[root@master bin]# 在哪台服务器使用上述命令启动则那台服务器即为 master 节点,使用 jps命令查看启动情况[root@master bin]# jps
7281 ResourceManager
8450 HMaster
6965 SecondaryNameNode
6535 NameNode
8619 HRegionServer
7693 QuorumPeerMain
8943 Jpsslave1[root@slave1 bin]# jps
2180 QuorumPeerMain
2509 Jps
1919 DataNode
2351 HRegionServer
[root@slave1 bin]# slave2[root@slave2 ~]# jps
3441 QuorumPeerMain
3875 HRegionServer
4040 Jps
3165 DataNode
[root@slave2 ~]# 可以看到服务器1启动和 HMaster 和 HRegionServer 进程,服务器2和服务器3启动和HRegionServer 进程。至此大功告成!!!困扰了我一个月的难题,终于解决了!!!🙇当然,我们也可以通过Web页面查看 Hbase 集群情况 : http://IP:16010加油(ง •_•)ง
Kafka-HBase-MapReduce-Mysql 连接实践 通话记录
1.项目介绍本项目采用的数据为通话记录数据,例(张三 李四 2021-4-23 12:32:13 2942)意思是张三在2021-4-23 12:32:13这个时间给李四通话,通话时长为2942秒数据来源【程序自己模拟数据的产生,交给Kafka的生产者】Kafka的消费者端接的是HBase数据库MapReduce读取HBase中的数据进行分析再将分析的数据导入MySQL2.各类介绍Produce模块DataProduce:主要负责生产数据Main:函数的入口testAPI:进行功能测试KafkaUtils:将数据发送到topicConsumer模块Main:程序的入口HBaseConsumer:消费者拉取数据HBaseDao:HBase的客户端对象,创建表导入数据HBaseUtils:主要是创建RowKey,还有一些建表和命名空间的操作Analysis模块HashUtils:将每个Cell中的数据存入到HashMap中MysqlUtils:主要是Mysql的连接操作CountMap:计算每个用户之间的通话记录次数DBWrite:实现了Writable、DBWritable,用于序列化以及写数据库操作3.项目各模块项目分为三个模块,分别是produce、consumer、analysisproduce:实现数据生产consumer:Kafka将数据写入HBaseanalysis:利用MapReduce分析数据将结果导入Mysql2.1 produce2.1.1 entrypublic class Main {
public static void main(String[] args) throws ParseException, InterruptedException {
//生产数据,发到Kafka
KafkaUtils.writeDataIntoKafka();
}
}2.1.2 dataProducepublic String produce(String startTime, String endTime) throws ParseException {
// 张三 李四 2021-2-3 13:43:25 1204
initMetaData();
//获得随机下标来获得拨打电话者
int callerIndex = (int) (Math.random() * telePhone.size());
String callerName = phoneToName.get(telePhone.get(callerIndex));
//获得被拔打电话者
int calleeIndex;
do {
calleeIndex = (int) (Math.random() * telePhone.size());
} while (callerIndex == calleeIndex);
String calleeName = phoneToName.get(telePhone.get(calleeIndex));
//定义解析时间的对象
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
//定义起止时间
Date startDate = null;
Date endDate = null;
//解析传入的时间字符串,将其转化成Date格式
startDate = sdf.parse(startTime);
endDate = sdf.parse(endTime);
//获得一个时间戳,来初始打电话的时间
long randomTs = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTs);
//将初始化好的Date时间,转化成字符串
String resultTimeString = sdf.format(resultDate);
//随机初始化小时、分钟、秒
int hour = (int) (Math.random() * 24);
int minute = (int) (Math.random() * 60);
int second = (int) (Math.random() * 60);
//初始化具体时间,精确到小时、分钟、秒
String specificTime = String.format(String.format("%02d", hour) + ":"
+ String.format("%02d", minute) + ":"
+ String.format("%02d", second));
//定义时间跨度,表明电话的拨打时长
int duration = (int) (Math.random() * 3600);
//拼接结果 张三 李四 2021-2-3 13:43:25 1204
String result = callerName + " " + calleeName + " " + resultTimeString + " " + specificTime + " " + duration;
return result;
}2.1.3 kafkaUtilspublic static void writeDataIntoKafka() throws ParseException, InterruptedException {
//定义配置对象
Properties properties = new Properties();
//定义主机名
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
//字符串序列化的类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//Kafka的主题
String topic = "telecom";
//定义一个生产者对象
KafkaProducer producer = new KafkaProducer<String, String>(properties);
//循环发送数据到Kafka
for (int i = 0; i < 1000; i++) {
//按给定起止时间生成数据
String value = dataProduce.produce("2021-1-1", "2021-5-1");
//睡眠1秒
Thread.sleep(1000);
//创建ProducerRecord对象
ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
//发送数据
producer.send(record);
}
//关闭资源
producer.close();
}2.2 consumer2.2.1 entrypublic class Main {
public static void main(String[] args) throws IOException, InterruptedException, ParseException {
//创建HBase消费者
HBaseConsumer hBaseConsumer = new HBaseConsumer();
//从Kafka中获取数据输到HBase
hBaseConsumer.getDataFromKafka();
}
}2.2.2 hbase2.2.2.1 HBaseConsumerpublic class HBaseConsumer {
public void getDataFromKafka() throws InterruptedException, IOException, ParseException {
//定义配置对象
Properties properties = new Properties();
//连接主机名
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
//是否自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//自动提交的时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
//消费者组名
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test3");
//字符串序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//创建消费者对象
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
//消费者订阅主题
consumer.subscribe(Arrays.asList("telecom"));
//创建一个Dao对象,用于上传数据到HBase
HBaseDao hBaseDao = new HBaseDao();
//从Kafka拉取数据
while (true) {
//拉取的时间间隔
ConsumerRecords<String,String> records = consumer.poll(100);
//拉取数据输到HBase
for (ConsumerRecord<String,String> record : records) {
String value = record.value();
System.out.println(value);
Thread.sleep(1000);
//上传数据
hBaseDao.put(value);
}
}
}
}2.2.2.2 HBaseDaopublic class HBaseDao {
//命名空间
private String nameSpace;
//表名
private String tableName;
//配置对象
public static Configuration conf;
//表对象
private Table table;
//连接HBase对象
private Connection connection;
//解析日期对象
private SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
private SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMddHHmmss");
//初始化配置对象
static {
conf = HBaseConfiguration.create();
}
public HBaseDao() throws IOException {
nameSpace = "telecom";
tableName = "teleRecord";
connection = ConnectionFactory.createConnection(conf);
if (!HBaseUtils.isExistTable(conf, tableName)) {
HBaseUtils.initNamespace(conf, nameSpace);
HBaseUtils.createTable(conf, tableName, "f1", "f2");
}
table = connection.getTable(TableName.valueOf(tableName));
}
//将数据导入HBase
public void put(String value) throws ParseException, IOException {
//将Kafka拉取的数据切分
String[] splitValues = value.split(" ");
String caller = splitValues[0];
String callee = splitValues[1];
String buildTime = splitValues[2];
String specificTime = splitValues[3];
String duration = splitValues[4];
//2021-03-21 12:23:04
buildTime = buildTime + " " + specificTime;
//20210321122304 用于创建rowKey
String buildTimeReplace = sdf2.format(sdf1.parse(buildTime));
//时间戳
String buildTimeTs = String.valueOf(sdf1.parse(buildTime).getTime());
//获得rowKey
String rowKey = HBaseUtils.createRowKey(caller, callee, buildTimeReplace, "1", duration);
//创建put对象
Put put = new Put(Bytes.toBytes(rowKey));
//添加各列属性
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("caller"), Bytes.toBytes(caller));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callee"), Bytes.toBytes(callee));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time"), Bytes.toBytes(buildTime));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time_ts"), Bytes.toBytes(buildTimeTs));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("flag"), Bytes.toBytes("1"));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("duration"), Bytes.toBytes(duration));
//添加put
table.put(put);
}
}2.2.3 hbaseUtilspublic class HBaseUtils {
//判断表是否存在
public static boolean isExistTable(Configuration conf, String tableName) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
boolean result = admin.tableExists(TableName.valueOf(tableName));
admin.close();
connection.close();
return result;
}
//判断命名空间是否存在
public static boolean isExistTableSpace(Configuration conf, String nameSpace) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
boolean result = false;
admin.close();
connection.close();
return result;
}
//创建命名空间
public static void initNamespace(Configuration conf, String nameSpace) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
try {
NamespaceDescriptor descriptor = NamespaceDescriptor.create(nameSpace).build();
admin.createNamespace(descriptor);
} catch (NamespaceExistException e) {
} finally {
admin.close();
connection.close();
}
}
//创建表
public static void createTable(Configuration conf, String tableName, String... cfs) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
for (String cf : cfs) {
hTableDescriptor.addFamily(new HColumnDescriptor(cf));
}
admin.createTable(hTableDescriptor);
admin.close();
connection.close();
}
//创建RowKey
public static String createRowKey(String caller, String callee, String buildTime, String flag, String duration) {
StringBuilder rowKey = new StringBuilder();
rowKey.append(caller + "_")
.append(buildTime + "_")
.append(callee + "_")
.append(flag + "_")
.append(duration);
return rowKey.toString();
}
}2.3 analysis2.3.1 hashUtilspublic class HashUtils {
public static void putValue(Cell cell, HashMap<String, String> hashMap) {
//获取cell中的列名
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
//获取每列的值
String value = Bytes.toString(CellUtil.cloneValue(cell));
//存入map
hashMap.put(qualifier, value);
}
}2.3.2 mysqlUtilspublic class MysqlUtils {
public static Connection connection;
public static String userName = "root";
public static String passwd = "123456";
public static PreparedStatement ps = null;
//获得Connection对象
static {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
try {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb" +
"?useSSL=false" +
"&allowPublicKeyRetrieval=true" +
"&serverTimezone=UTC",
userName,
passwd);
} catch (SQLException e) {
e.printStackTrace();
}
}
//清空表数据
public static void deleteData(String tableName) throws SQLException {
String sql = String.format("delete from %s", tableName);
ps = connection.prepareStatement(sql);
ps.executeUpdate();
}
}2.3.3 hbaseToMR2.3.3.1 callCount2.3.3.1.1 Mappublic class CountMap extends TableMapper<Text, IntWritable> {
//输出 张三 1
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
HashMap<String, String> hashMap = new HashMap<>();
for (Cell cell : value.rawCells()) {
HashUtils.putValue(cell, hashMap);
}
String caller = hashMap.get("caller");
String callee = hashMap.get("callee");
context.write(new Text(caller + "-" + callee), new IntWritable(1));
}
}2.3.3.1.2 Reducepublic class CountReduce extends Reducer<Text, IntWritable, DBWrite, NullWritable> {
//输出 张三 23
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += 1;
}
context.write(new DBWrite(key.toString(), count), NullWritable.get());
}
}2.3.3.1.3 Driverpublic class CountDriver implements Tool {
//配置对象
public static Configuration conf = null;
//Mysql数据库表名
public static String mysqlTableName = "callcounts";
//Mysql表中列名
public static String[] fieldNames = {"callercallee", "counts"};
//Mysql驱动类
public static String driverClass = "com.mysql.cj.jdbc.Driver";
//Mysql的URL
public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" +
"?useSSL=false" +
"&allowPublicKeyRetrieval=true" +
"&serverTimezone=UTC";
//Mysql的用户名
public static String userName = "root";
//Mysql的用户密码
public static String passwd = "123456";
@Override
public int run(String[] strings) throws Exception {
//配置Mysql
DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd);
//清空表
MysqlUtils.deleteData(mysqlTableName);
//获得job对象
Job job = Job.getInstance(conf);
//关联Jar
job.setJarByClass(CountDriver.class);
//配置MapperJob
TableMapReduceUtil.initTableMapperJob("teleRecord",
new Scan(),
CountMap.class,
Text.class,
IntWritable.class,
job);
//关联Reduce类
job.setReducerClass(CountReduce.class);
job.setOutputKeyClass(DBWrite.class);
job.setOutputValueClass(NullWritable.class);
//设置输出类型
job.setOutputFormatClass(DBOutputFormat.class);
DBOutputFormat.setOutput(job, mysqlTableName, fieldNames);
//提交job任务
boolean result = job.waitForCompletion(true);
return result ? 0 : 1;
}
@Override
public void setConf(Configuration configuration) {
conf = configuration;
conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
}
@Override
public Configuration getConf() {
return conf;
}
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
int run = ToolRunner.run(conf, new CountDriver(), args);
System.exit(run);
} catch (Exception e) {
e.printStackTrace();
}
}
}2.3.3.1.4 DBWritepublic class DBWrite implements Writable, DBWritable {
String caller_callee = "";
int count = 0;
public DBWrite(){}
public DBWrite(String caller_callee, int count){
this.caller_callee=caller_callee;
this.count=count;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(caller_callee);
out.writeInt(count);
}
@Override
public void readFields(DataInput in) throws IOException {
this.caller_callee = in.readUTF();
this.count = in.readInt();
}
@Override
public void write(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setString(1, caller_callee);
preparedStatement.setInt(2, count);
}
@Override
public void readFields(ResultSet resultSet) throws SQLException {
this.caller_callee = resultSet.getString(1);
this.count = resultSet.getInt(2);
}
}2.3.3.2 callerDuration2.3.3.2.1 Mappublic class DurationMap extends TableMapper<Text, LongWritable> {
//输出 张三 2041
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//创建HashMap对象,为了下面取出对应值用
HashMap<String, String> hashMap = new HashMap<>();
//迭代rowkey对应的每个单元
for (Cell cell : value.rawCells()) {
HashUtils.putValue(cell, hashMap);
}
//获得电话发起人
String caller = hashMap.get("caller");
//获得每次电话时长
String duration = hashMap.get("duration");
//写出
context.write(new Text(caller), new LongWritable(Long.valueOf(duration)));
}
}2.3.3.2.2 Reducepublic class DurationReduce extends Reducer<Text, LongWritable, DBWrite, NullWritable> {
//输出 张三 4204
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//存储每个人拨打电话的总时长
long sum = 0;
//迭代每个时长
for (LongWritable value : values) {
sum += value.get();
}
//将结果写出
context.write(new DBWrite(key.toString(), String.valueOf(sum)), NullWritable.get());
}
}2.3.3.2.3 Driverpublic class DurationDriver implements Tool {
//配置对象
public static Configuration conf = null;
//Mysql数据库表名
public static String mysqlTableName = "callerdurations";
//Mysql表中列名
public static String[] fieldNames = {"caller", "durations"};
//Mysql驱动类
public static String driverClass = "com.mysql.cj.jdbc.Driver";
//Mysql的URL
public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" +
"?useSSL=false" +
"&allowPublicKeyRetrieval=true" +
"&serverTimezone=UTC";
//Mysql的用户名
public static String userName = "root";
//Mysql的用户密码
public static String passwd = "123456";
@Override
public int run(String[] strings) throws Exception {
//配置Mysql
DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd);
//清空表
MysqlUtils.deleteData(mysqlTableName);
//获得job对象
Job job = Job.getInstance(conf);
//关联Jar
job.setJarByClass(DurationDriver.class);
//配置MapperJob
TableMapReduceUtil.initTableMapperJob("teleRecord",
new Scan(),
DurationMap.class,
Text.class,
LongWritable.class,
job);
//关联Reduce类
job.setReducerClass(DurationReduce.class);
job.setOutputKeyClass(DBWrite.class);
job.setOutputValueClass(NullWritable.class);
//设置输出类型
job.setOutputFormatClass(DBOutputFormat.class);
DBOutputFormat.setOutput(job, mysqlTableName, fieldNames);
//提交job任务
boolean result = job.waitForCompletion(true);
return result ? 0 : 1;
}
@Override
public void setConf(Configuration configuration) {
conf = configuration;
conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
}
@Override
public Configuration getConf() {
return conf;
}
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
int run = ToolRunner.run(conf, new DurationDriver(), args);
System.exit(run);
} catch (Exception e) {
e.printStackTrace();
}
}
}2.3.3.3 dayCountDuration2.3.3.3.1 Mappublic class dayCountDurationMap extends TableMapper<Text, LongWritable> {
//2021-01-13 3042
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
HashMap<String, String> hashmap = new HashMap<>();
for (Cell cell : value.rawCells()) {
HashUtils.putValue(cell, hashmap);
}
String date = hashmap.get("build_time").substring(0, 10);
String duration = hashmap.get("duration");
context.write(new Text(date), new LongWritable(Long.valueOf(duration)));
}
}2.3.3.3.2 Reducepublic class dayCountDurationReduce extends Reducer<Text, LongWritable, DBWrite, NullWritable> {
//输出 2021-01-13 2042
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long durations = 0;
for (LongWritable value : values) {
durations += value.get();
}
context.write(new DBWrite(key.toString(), durations), NullWritable.get());
}
}2.3.3.3.3 Driverpublic class dayCountDurationDriver implements Tool {
//配置对象
public static Configuration conf = null;
//Mysql数据库表名
public static String mysqlTableName = "daydurations";
//Mysql表中列名
public static String[] fieldNames = {"date", "durations"};
//Mysql驱动类
public static String driverClass = "com.mysql.cj.jdbc.Driver";
//Mysql的URL
public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" +
"?useSSL=false" +
"&allowPublicKeyRetrieval=true" +
"&serverTimezone=UTC";
//Mysql的用户名
public static String userName = "root";
//Mysql的用户密码
public static String passwd = "123456";
@Override
public int run(String[] strings) throws Exception {
//配置Mysql
DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd);
//清空表
MysqlUtils.deleteData(mysqlTableName);
//获得job对象
Job job = Job.getInstance(conf);
//关联Jar
job.setJarByClass(dayCountDurationDriver.class);
//配置MapperJob
TableMapReduceUtil.initTableMapperJob("teleRecord",
new Scan(),
dayCountDurationMap.class,
Text.class,
LongWritable.class,
job);
//关联Reduce类
job.setReducerClass(dayCountDurationReduce.class);
job.setOutputKeyClass(DBWrite.class);
job.setOutputValueClass(NullWritable.class);
//设置输出类型
job.setOutputFormatClass(DBOutputFormat.class);
DBOutputFormat.setOutput(job, mysqlTableName, fieldNames);
//提交job任务
boolean result = job.waitForCompletion(true);
return result ? 0 : 1;
}
@Override
public void setConf(Configuration configuration) {
conf = configuration;
conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
}
@Override
public Configuration getConf() {
return conf;
}
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
int run = ToolRunner.run(conf, new dayCountDurationDriver(), args);
System.exit(run);
} catch (Exception e) {
e.printStackTrace();
}
}
}4 项目源码Github地址