首页> 标签> Hbase
"Hbase"
共 2661 条结果
全部 问答 文章 公开课 课程 电子书 技术圈 体验
【HBase】(6)-Compact合并StoreFile流程
简 介:HBase是谷歌BigData论文的一个代码实现,在大数据处理领域应用广泛。本文意在记录自己近期学习过程中的所学所得,如有错误,欢迎大家指正。 关键词:大数据组件、HBase、NoSQL一、Compact合并StoreFile文件我们每次put一条数据,都会将其写入到MemStore中,然后达到一定条件后,将其flush到HDFS中,久而久之这样就会产生过多的小文件,我们就需要定期或者按照一定约束进行合并,产生的多个StoreFile如下图:此时可以看到有两个HFile文件,对应着MemStore刷写两次到HDFS中。 由于我们每个字段的不同版本可能在不同的HFile中,当我们查询时就要去遍历所有的HFile,拿到最新的data,这样效率就会较低,所以需要减少HFile的个数,清理掉过期不生效的数据,进行StoreFile Compaction合并。从上图可以看出,合并分为两种,一种是Minor Compaction,另外一种是Major Compaction。Minor Compaction:它会将临近的若干个较小的HFile合并成一个较大的HFile,但是它并不会清理过期和删除的数据。Major Compaction:会将一个Store下的所有的HFile合并成一个大HFile,并且会清理掉过期和删除的数据。这些定期合并以及相关的配置可以在配置文件中进行修改。下面测试一下合并文件。上图我们的HFile文件中有两个,我们尝试put两条新的数据,然后flush到磁盘中。现在可以发现HDFS上存在4个HFile文件,我们用命令手动合并一下。现在会比之前多一个文件,就是刚才的4个文件合并后的,但是并没有马上删除其它4个文件,可能是延迟或者是一种安全措施,并不会马上删除,过了一段时间还是会被删除的。
文章
NoSQL  ·  大数据  ·  分布式数据库  ·  Hbase
2023-01-18
【HBase】(5)-HBase读数据流程
简 介:HBase是谷歌BigData论文的一个代码实现,在大数据处理领域应用广泛。本文意在记录自己近期学习过程中的所学所得,如有错误,欢迎大家指正。 关键词:大数据组件、HBase、NoSQL一、HBase读数据流程 HBase读数据流程:首先从zookeeper中获得meta表所在的RegionServer然后从meta表中获取待查询的表的Region所在的RegionServer,将查询到的信息放到Meta Cache缓存中,便于后续查找与查询到的RegionServer进行通信从该RegionServer中的MemStore和StoreFile(内存和磁盘)分别进行读取数据将从内存和磁盘读取到的数据进行合并,什么是合并就是有可能内存和磁盘中的数据是两个版本,或者可能同样类型的数据然后时间戳不一样,需要将两个地方读取到的数据进行对比,返回最新时间戳的数据然后将合并后的数据缓存到Block Cache,以便于后续查询将合并后的数据返回给客户端1.我们测试一下,首先创建一个stu表:2.然后向stu表中插入一条数据我们发现此时HDFS中的info列族中还没有数据,是因为此时数据还在内存中,没有被刷写,我们可以进行手动刷写3.将插入的数据进行刷写发现手动刷写后就可以在HDFS中发现了数据,就是将MemStore中的数据刷写成StoreFile4.再插入一条时间戳小于刚才的数据插入之前先扫描一下刚才插入的时间戳然后进行插入一条时间戳小的数据那么现在张三对应的数据此时在磁盘中,而新插入的李四仍在内存中,按照常理想,我们会认为进行读取内存,也就是会读到李四也就是会读到李四[外链图片转存中…(img-JVVIIENC-1624367770630)]但是发现现在扫描仍然是张三,这就说明默认读取的不是内存,真正的读取就是将内存和磁盘中的数据都进行读取,然后进行时间戳比较,返回最新的时间戳数据,而不是读取内存中的,然后将读取到的进行合并返回给客户端,然后将该数据同时存储到Block Cache中,方便后续的查找。
文章
存储  ·  缓存  ·  NoSQL  ·  大数据  ·  分布式数据库  ·  Hbase
2023-01-18
【HBase】(4)-HBase写数据流程
 简 介:HBase是谷歌BigData论文的一个代码实现,在大数据处理领域应用广泛。本文意在记录自己近期学习过程中的所学所得,如有错误,欢迎大家指正。 关键词:大数据组件、HBase、NoSQL一、HBase的写流程客户端向HBase写入数据分为几个流程:获取meta表所在的的RegionServer,也就是说客户端需要访问Zookeeper获得meta表所在的服务器,meta表中维护者我们HBase中所有表的位置,我们要获得我们要写的表的所在位置向维护meta表的RegionServer访问,获得meta表从meta表中获得我们目标写入数据的表所在的RegionServer向目标表所在的RegionServer进行发送请求向memstore进行写入数据然后向wal中写入预写日志,记录数据操作向客户端发送ack,证明put成功,此时客户端就不用管任何事了,之后的flush刷写操作就全部交给RegionServer来进行管理。有个地方需要注意就是当我们向memstore写入数据时,如果发生异常,就是触发回滚机制,将刚才写入的数据进行清空,如果写入成功,我们再次向wal中写入日志。 我们看到zookeeper目录下有个hbase,hbase下面有个meta-region-server,查看后可以发现它是存在hadoop103上面,我们访问下16010端口查看一下是不是 用命令行扫描一下这个表,看一下这个表中到底存在着什么数据我们可以看到有一列就是info:server,它的值就存着该表所在的RegionServer,每个rowkey就是表名加上切分该表的rowkey范围rver,每个rowkey就是表名加上切分该表的rowkey范围现在看上面是每一张表只对应一个RegionServer,原因是表现在比较小,还没有达到切分条件,如果达到一定阈值就会进行切分,再次扫描该表就会看到每张表对应不同的RegionServer。
文章
NoSQL  ·  大数据  ·  分布式数据库  ·  Hbase
2023-01-18
【HBase】(3)-HBase详细结构图
 简 介:HBase是谷歌BigData论文的一个代码实现,在大数据处理领域应用广泛。本文意在记录自己近期学习过程中的所学所得,如有错误,欢迎大家指正。 关键词:大数据组件、HBase、NoSQL一、HBase详细架构图 从上面的图可以看出,整个HBase组件是建立在HDFS的基础之上,利用zookeeper进行管理整个集群集群中存在一个HMaster和多个HRegionServer,HMaster是用来进行与zookeeper进行通信,管理整个HRegionServer的负载均衡,调整HRegion的分配。如果说当前的HMaster死掉之后,剩余的机器会进行抢夺机会,谁拿到了谁就会成为下一个HMaster。二、组件HLog:每个HRegionServer可以看到里面维护着一个HLog,HLog的作用就是说将一系列的写操作进行保存,如果某一时刻服务器宕机,我们可以再次读取HLog中的操作进行数据还原,重新将数据写回HDFS。HRegion:它可以理解为每张表的一部分,因为HBase是用来存储大数据的,所以我们的表一定会很大,如果表过于大不便于操作,效率较低,所以需要将大表进行切分,切分成多个HRegion,可以说包含多个rowKey的数据。Store:HBase是NoSQL数据库,它不像mysql这种有着强烈的列关系,它是以列族进行区分,每个列族下包含着多个列,可以说每个Store就代表着一个列族。Mem Store:它是内存缓冲区,我们在像HBase中写入数据时,有时会发现数据不能够实时的写入HDFS中,但是此时是可以扫描到的,原因是此时数据被写入到内存,还没有被刷写到HDFS系统中,只有当Mem Store中的数据达到一定阈值时,才会触发向HDFS中写数据的事件。StoreFile:上面说Mem Store会一定时机地刷写一次数据,那么每一次刷写就会形成一个StoreFile文件,刷写多次就会形成多个文件,而StoreFile的地层是以HFile进行存储。文件,而StoreFile的地层是以HFile进行存储。HFile:HFile是一种数据存储格式,StoreFile就是以该形式进行存储,它是一种以keyValue的形式进行存储。
文章
存储  ·  负载均衡  ·  NoSQL  ·  大数据  ·  关系型数据库  ·  MySQL  ·  分布式数据库  ·  数据库  ·  Hbase
2023-01-18
【HBase】(2)-集群的启动与关闭
简 介:HBase是谷歌BigData论文的一个代码实现,在大数据处理领域应用广泛。本文意在记录自己近期学习过程中的所学所得,如有错误,欢迎大家指正。 关键词:大数据组件、HBase、NoSQL HBase的运转是需要HDFS支持的,而且需要zookeeper进行管理Master,所以我们开启HBase服务之前要开启这两个服务。1.开启HDFS# 注意这里,因为hbase不需要进行计算,所以可以不用开yarn sbin/start-dfs.sh2.开启zookeeper# 这里我是将开启zk写成了脚本 zk.sh start 下面是我的群起zookeeper脚本,可以参考进行修改自己的路径。#!/bin/bash case $1 in "start"){ for i in hadoop102 hadoop103 hadoop104 do echo "----------启动$i zk----------" ssh $i "source /etc/profile;nohup /opt/module/zookeeper-3.4.10/bin/zkServer.sh start" done };; "stop"){ for i in hadoop102 hadoop103 hadoop104 do echo "----------关闭$i zk----------" ssh $i "source /etc/profile;nohup /opt/module/zookeeper-3.4.10/bin/zkServer.sh stop" done };; "status"){ for i in hadoop102 hadoop103 hadoop104 do echo "----------查看$i zk----------" ssh $i "source /etc/profile;nohup /opt/module/zookeeper-3.4.10/bin/zkServer.sh status" done };; esac3.群起HBasebin/start-hbase.sh 我们可以看到启动了两个进程,分别是Master和HRegionServer。访问16010端口,可以看到集群全部已经启动成功。有时候发现HRegionServer明明启动了,然后马上就死掉了,或者直接就启动不起来,这种原因可能是集群的时间不同步,检查以下集群的时间,我们测试一下:====我们首先将其中一个服务器的时间进行修改,与其它集群不同步。我们尝试进行群起HBase,发现该服务器起不来我们进入该服务器的hbase的日志目录下,查看下日志发现:上面说明当前服务器的时间与其它服务器的时间大于允许最大通信时间,导致不能启动,其实我们可以进行配置该参数,即最大时长,但是如果过长的话没有什么意义,所以我们此时需要将集群的时间进行同步。将集群时间同步之后就发现服务器又能够启动起来了。
文章
NoSQL  ·  大数据  ·  分布式数据库  ·  Hbase
2023-01-18
【HBase】(1)-HBase的安装
简 介:HBase是谷歌BigData论文的一个代码实现,在大数据处理领域应用广泛。本文意在记录自己近期学习过程中的所学所得,如有错误,欢迎大家指正。 关键词:大数据组件、HBase、NoSQL1.解压HBase安装包tar -zxvf hbase-1.3.1-bin.tar.gz -C /opt2.修改配置文件1.hbase-env.sh# 配置jdk export JAVA_HOME=/opt/module/jdk1.8.0_144 # 将HBase内置的zk置为false,不使用hbase内置的zk,否则会出现问题,使用自己的zk可以很方便的查询相关数据 export HBASE_MANAGES_ZK=false2.hbase-site.sh<configuration> <property> <!--将HBase存储到HDFS的哪个目录下--> <name>hbase.rootdir</name> <value>hdfs://hadoop102:9000/HBase</value> </property> <property> <!--配置是否配置分布式集群,如果为false就是单机模式--> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <!-- 0.98 后的新变动,之前版本没有.port,默认端口为 60000 --> <name>hbase.master.port</name> <value>16000</value> </property> <property> <!--配置HBase的集群--> <name>hbase.zookeeper.quorum</name> <value>hadoop102,hadoop103,hadoop104</value> </property> <property> <!--配置HBase对应的zookeeper目录--> <name>hbase.zookeeper.property.dataDir</name> <value>/opt/module/zookeeper-3.4.10/zkData</value> </property> </configuration>3.regionservers# 配置对应的集群,分配regionservers node1 node2 node33.将需要的HDFS文件链接到HBase目录下ln -s /opt/module/hadoop-2.7.2/etc/hadoop/core-site.xml /opt/module/hbase/conf/core-site.xml ln -s /opt/module/hadoop-2.7.2/etc/hadoop/hdfs-site.xml /opt/module/hbase/conf/hdfs-site.xml
文章
NoSQL  ·  大数据  ·  分布式数据库  ·  Hbase
2023-01-18
VMware创建Linux虚拟机之(五)Spark完全分布式部署教程
Hello,转眼间已到2022年底,学期末……总体来说,今年经历了很多,真正的成长了许多,成熟了许多。只能说,希望,明天依旧美好!!! 🐒本篇博客使用到的工具有:VMware16 ,Xftp7若不熟悉操作命令,推荐使用带GUI页面的CentOS7虚拟机我将使用带GUI页面的虚拟机演示虚拟机(Virtual Machine)指通过软件模拟的具有完整硬件系统功能的、运行在一个完全隔离环境中的完整计算机系统。在实体计算机中能够完成的工作在虚拟机中都能够实现。在计算机中创建虚拟机时,需要将实体机的部分硬盘和内存容量作为虚拟机的硬盘和内存容量。每个虚拟机都有独立的CMOS、硬盘和操作系统,可以像使用实体机一样对虚拟机进行操作。【确保服务器集群安装和配置已经完成!】可参考我的博客:VMware创建Linux虚拟机之(一)实现免密登录_Vim_飞鱼的博客-CSDN博客VMware创建Linux虚拟机之(二)下载安装JDK与配置Java环境变量_Vim_飞鱼的博客-CSDN博客VMware创建Linux虚拟机之(三)Hadoop安装与配置及搭建集群_Vim_飞鱼的博客-CSDN博客_利用vmware虚拟机安装hadoopVMware创建Linux虚拟机之(四)ZooKeeper&HBase完全分布式安装_Vim_飞鱼的博客-CSDN博客_在vmware中hbase的安装和配置前言请根据读者的自身情况,进行相应随机应变。我的三台CentOS7服务器:主机:master(192.168.149.101)从机:slave1(192.168.149.102)从机:slave2(192.168.149.103)每一个节点的安装与配置是相同的,在实际工作中,通常在master节点上完成安装和配置后,然后将安装目录通过 scp 命令复制到其他节点即可。注意:所有操作都是root用户权限,需要我们登陆时选择root用户登录。唯有热爱,可抵岁月漫长,唯有热爱,不畏世间无常!继Mapreduce之后,作为新一代并且是主流的计算引擎,学好Spark是非常重要的,这一篇博客会专门介绍如何部署一个分布式的Spark计算框架,在之后的博客中,会继续讲到Spark的基本模块的介绍以及底层原理,好了,废话不多说,直接开始吧!下载Spark安装包 部署Spark时,我们使用的版本如下所示:Apache Spark™ - Unified Engine for large-scale data analyticshttps://spark.apache.org/ 解压Spark安装包        首先,需要确保 network 网络已经配置好,使用 Xftp 等类似工具进行上传,把 spark-3.1.2-bin-hadoop3.2.tgz 上传到 opt/ 目录内。(也可使用U盘等工具拖拽)上传完成后,在 master 主机上执行以下代码: 解压Sparkcd /opt/ tar -zxvf spark-3.1.2-bin-hadoop3.2.tgz执行成功后,系统在 opt 目录自动创建 spark-3.1.2 子目录。  注意:可使用 ls 等命令查看文件解压是否无误。配置bashrc文件(等同于profile)#Spark export SPARK_HOME=/opt/spark-3.1.2 export PATH=$SPARK_HOME/bin:$PATH三台虚拟机均进行此操作  修改spark-env.sh文件[root@master conf]# cp spark-env.sh.template spark-env.sh [root@master conf]# vim spark-env.sh [root@master conf]# 添加如下内容:export JAVA_HOME=/opt/jdk1.8.0_261 export HADOOP_HOME=/opt/hadoop/hadoop export SPARK_MASTER_IP=master export SPARK_MASTER_PORT=7077 export SPARK_DIST_CLASSPATH=$(/opt/hadoop/hadoop/bin/hadoop classpath) export HADOOP_CONF_DIR=/opt/hadoop/hadoop/etc/hadoop export SPARK_YARN_USER_ENV="CLASSPATH=/opt/hadoop/hadoop/etc/hadoop" export YARN_CONF_DIR=/opt/hadoop/hadoop/etc/hadoop如下图所示:将spark拷贝到其他两个节点[root@master conf]# scp -r /opt/spark-3.1.2/ slave1:/opt/ [root@master conf]# scp -r /opt/spark-3.1.2/ slave2:/opt/启动spark集群并查看进程  master:[root@master opt]# /opt/spark-3.1.2/sbin/start-all.sh starting org.apache.spark.deploy.master.Master, logging to /opt/spark-3.1.2/logs/spark-root-org.apache.spark.deploy.master.Master-1-master.out slave1: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-3.1.2/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slave1.out slave2: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-3.1.2/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slave2.out master: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-3.1.2/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-master.out [root@master opt]# jps 3860 Worker 3677 Master 3886 Jpsslave1:[root@slave1 ~]# jps 1749 Worker 1806 Jps [root@slave1 ~]# slave2:[root@slave2 ~]# jps 3667 Jps 3590 Worker [root@slave2 ~]# 启动spark检测是否能正常启动启动local模式:spark-shell --master local[root@master hadoop]# cd /opt/spark-3.1.2/conf/ [root@master conf]# spark-shell --master local SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/spark-3.1.2/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/hadoop/hadoop/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 2022-12-21 22:01:54,437 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://leader:4040 Spark context available as 'sc' (master = local, app id = local-1671631329749). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.1.2 /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_261) Type in expressions to have them evaluated. Type :help for more information. scala> 结合实例,可以看到我们的 Spark 组件已经完美部署完成!💪请问,你学废了吗?
文章
分布式计算  ·  Hadoop  ·  Java  ·  Linux  ·  分布式数据库  ·  Apache  ·  虚拟化  ·  Spark  ·  Hbase
2023-01-17
VMware创建Linux虚拟机之(四)ZooKeeper&HBase完全分布式安装 下
HBase完全分布式安装  下载HBase安装包Apache HBase – Apache HBase Downloadshttps://hbase.apache.org/downloads.html上传至master虚拟机并解压HBasecd /opt/ tar -zxvf hbase-2.3.3.tar.gz #修改权限 sudo chmod -R 777 /opt/hbase-2.3.3配置环境变量[root@master ~]# vim /etc/bashrc #HBase export HBASE_HOME=/opt/hbase-2.3.3 export PATH=$PATH:$HBASE_HOME/bin #配置生效 [root@master ~]# source /etc/bashrc三台虚拟机均进行此操作  在这里我们可以使用 hbase -version  查看环境变量是否正确[root@master ~]# hbase -version java version "1.8.0_261" Java(TM) SE Runtime Environment (build 1.8.0_261-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.261-b12, mixed mode) [root@master ~]# 配置hbase相关文件修改及配置 hbase-2.3.3/conf  目录下的  hbase-env.sh  文件[root@master ~]# cd /opt/hbase-3.6.2/conf [root@master ~]# vim hbase-env.sh #!/usr/bin/env bash # Where log files are stored. $HBASE_HOME/logs by default. # export HBASE_LOG_DIR=${HBASE_HOME}/logs # Enable remote JDWP debugging of major HBase processes. Meant for Core Developers # export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8070" # export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8071" # export HBASE_THRIFT_OPTS="$HBASE_THRIFT_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8072" # export HBASE_ZOOKEEPER_OPTS="$HBASE_ZOOKEEPER_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8073" # export HBASE_REST_OPTS="$HBASE_REST_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8074" # A string representing this instance of hbase. $USER by default. # export HBASE_IDENT_STRING=$USER # The scheduling priority for daemon processes. See 'man nice'. # export HBASE_NICENESS=10 # The directory where pid files are stored. /tmp by default. export HBASE_PID_DIR=/opt/hadoop/hadoop/pids # Seconds to sleep between slave commands. Unset by default. This # can be useful in large clusters, where, e.g., slave rsyncs can # otherwise arrive faster than the master can service them. # export HBASE_SLAVE_SLEEP=0.1 # Tell HBase whether it should manage it's own instance of ZooKeeper or not. # export HBASE_MANAGES_ZK=true # The default log rolling policy is RFA, where the log file is rolled as per the size defined for the # RFA appender. Please refer to the log4j.properties file to see more details on this appender. # In case one needs to do log rolling on a date change, one should set the environment property # HBASE_ROOT_LOGGER to "<DESIRED_LOG LEVEL>,DRFA". # For example: # HBASE_ROOT_LOGGER=INFO,DRFA # The reason for changing default to RFA is to avoid the boundary case of filling out disk space as # DRFA doesn't put any cap on the log size. Please refer to HBase-5655 for more context. # Tell HBase whether it should include Hadoop's lib when start up, # the default value is false,means that includes Hadoop's lib. export HBASE_DISABLE_HADOOP_CLASSPATH_LOOKUP="true" # Override text processing tools for use by these launch scripts. # export GREP="${GREP-grep}" # export SED="${SED-sed}" export JAVA_HOME=/opt/jdk1.8.0_261 export HBASE_HOME=/opt/hbase-2.3.3 export HBASE_MANAGES_ZK=false其中 HBASE_MANAGES_ZK=false 表示我们使用自己安装  zookeeper  集群而不是 hbase 自带的 zookeeper 集群修改及配置 hbase-2.3.3/conf 目录下的 hbase-site.xml 文件 [root@master ~]# vim hbase-site.xml <?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ --> <configuration> <property> <name>hbase.root.dir</name> <value>hdfs://master:9000/hbase</value> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hbase.tmp.dir</name> <value>./tmp</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>master,slave1,slave2</value> </property> <property> <name>hbase.zookeeper.property.clientPort</name> <value>2181</value> </property> <property> <name>hbase.unsafe.stream.capability.enforce</name> <value>false</value> </property> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/opt/zookeeper-3.6.2</value> </property> <property> <name>zookeeper.session.timeout</name> <value>300000</value>   <!--默认: 180000 :zookeeper 会话超时时间,单位是毫秒 --> </property> <property> <name>hbase.master.maxclockskew</name> <value>30000</value> </property> </configuration>添加 hbase 集群服务器的 ip 或者 hostname ,vi regionservers1. [root@master ~]# vim regionservers 2. 3. master 4. slave1 5. slave2将以上  Hbase 文件复制到三台服务器上scp -r /opt/hbase-2.3.3 root@slave1:/opt scp -r /opt/hbase-2.3.3 root@slave2:/opt完成即可   启动hbase配置完成hbase后将上面的所有文件复制到其他两台服务器上,然后使用  start-hbase.sh  命令启动hbase集群[root@master ~]# cd /opt/hbase-2.3.3/bin/ [root@master bin]# ls considerAsDead.sh hbase-config.cmd master-backup.sh start-hbase.sh draining_servers.rb hbase-config.sh region_mover.rb stop-hbase.cmd get-active-master.rb hbase-daemon.sh regionservers.sh stop-hbase.sh graceful_stop.sh hbase-daemons.sh region_status.rb test hbase hbase-jruby replication tmp hbase-cleanup.sh hirb.rb rolling-restart.sh zookeepers.sh hbase.cmd local-master-backup.sh shutdown_regionserver.rb hbase-common.sh local-regionservers.sh start-hbase.cmd [root@master bin]# start-hbase.sh running master, logging to /opt/hbase-2.3.3/logs/hbase-root-master-master.out slave1: running regionserver, logging to /opt/hbase-2.3.3/logs/hbase-root-regionserver-slave1.out slave2: running regionserver, logging to /opt/hbase-2.3.3/logs/hbase-root-regionserver-slave2.out master: running regionserver, logging to /opt/hbase-2.3.3/logs/hbase-root-regionserver-master.out [root@master bin]# 在哪台服务器使用上述命令启动则那台服务器即为 master 节点,使用 jps命令查看启动情况[root@master bin]# jps 7281 ResourceManager 8450 HMaster 6965 SecondaryNameNode 6535 NameNode 8619 HRegionServer 7693 QuorumPeerMain 8943 Jpsslave1[root@slave1 bin]# jps 2180 QuorumPeerMain 2509 Jps 1919 DataNode 2351 HRegionServer [root@slave1 bin]# slave2[root@slave2 ~]# jps 3441 QuorumPeerMain 3875 HRegionServer 4040 Jps 3165 DataNode [root@slave2 ~]# 可以看到服务器1启动和  HMaster 和 HRegionServer  进程,服务器2和服务器3启动和HRegionServer  进程。至此大功告成!!!困扰了我一个月的难题,终于解决了!!!🙇‍当然,我们也可以通过Web页面查看 Hbase 集群情况 : http://IP:16010加油(ง •_•)ง
文章
Linux  ·  分布式数据库  ·  Apache  ·  虚拟化  ·  Hbase
2023-01-17
flume应该思考的问题
flume是比较常用的大数据技术,那么学习flume,我们还需要思考flume,这样理解才能在遇到问题的时候,更容易解决,使用起来更加的得心应手。下面介绍了flume的相关内容及个人的理解。flume应用一般来讲,我们接触flume可能更早一些。flume如何安装可参考让你快速认识flume及安装和使用flume1.5传输数据(日志)到hadoop2.2http://www.aboutyun.com/forum.php?mod=viewthread&tid=7949如果你安装测试过flume,可以知道flume可以传递数据到另外的地方。比如我们可以传递本地文件到hadoop文件,比如搜集日志到hadoop,然后通过mapreduce或则spark处理。这也是比较常见的。flume解析flume有哪些内容,我们刚开始学习的时候,几乎都是复制黏贴的方式。对于它们几乎不怎么理解,或则只是停留在表面的理解。所以导致我们产生异常或则错误的时候,就不知道怎么解决了。这里解析下flume,可以知道我们在干什么,我们遇到错误的时候,能够知道哪里出现了问题。channel的作用flume传递数据,包含三个组件:source,channel,sink.那么如果我们来开发flume,我们会如何开发。好像channel这个不是必须的。因为有了数据源source和数据传递目标sink,应该就可以了。为何还需要channel。感觉channel是多此一举。从正常的角度来说channel确实是不需要的。但是有一个前提,source和sink要保持同步。也就说,source发送一条数据,sink需要立即消费和保存一条数据。下图是正常flume下图是去掉channel的flume。如果一旦数据源频率过快,sink来不及消费保存数据,那么就会造成丢失数据。如何定制flume一个灵活的程序,都是可以配置的,最常见的是xml格式文件,当然也可以是其它格式,普通txt也是可以的。所以我们看到无论是那种开源技术,都是可以配置的。甚至对于刚入门的初学者来说,就认为配置文件是必须的。所以我们这里所说的定制,是对flume的的定义。那么flume该如何定制。那就是通过对应source、channel、sink的定义。这里我们只接贴出配置文件agent1表示代理名称 agent1.sources=source1 agent1.sinks=sink1 agent1.channels=channel1 #配置source1 agent1.sources.source1.type=spooldir agent1.sources.source1.spoolDir=/usr/aboutyunlog agent1.sources.source1.channels=channel1 agent1.sources.source1.fileHeader = false #配置sink1 agent1.sinks.sink1.type=hdfs agent1.sinks.sink1.hdfs.path=hdfs://master:8020/aboutyunlog agent1.sinks.sink1.hdfs.fileType=DataStream agent1.sinks.sink1.hdfs.writeFormat=TEXT agent1.sinks.sink1.hdfs.rollInterval=4 agent1.sinks.sink1.channel=channel1 #配置channel1 agent1.channels.channel1.type=file agent1.channels.channel1.checkpointDir=/usr/aboutyun_tmp123 agent1.channels.channel1.dataDirs=/usr/aboutyun_tmp这里解析下上面的配置:首先是对flume的agent的配置。对于代理取了一个名字agent1。agent1里面包含三个组件,这三个组件也分别取一个名字:source1,sink1,channel1agent1表示代理名称agent1.sources=source1agent1.sinks=sink1agent1.channels=channel1我们为什么给他们取名字,是为了方便下面我们给他们定义。不取名字会带来什么后果。如下图,如果多个channel或则sink,我们就无法区分和定义了。取名之后,我们分别对他们定义。#配置source1 agent1.sources.source1.type=spooldir agent1.sources.source1.spoolDir=/usr/aboutyunlog agent1.sources.source1.channels=channel1 agent1.sources.source1.fileHeader = false先来解析一条agent1.sources.source1.type=spooldir上面是说agent1的数据源,source1的类型是spooldir。上面看上去很复杂,但是其实就定义了那么几项:spoolDir,channels,fileHeader 分别是目录,使用哪个channel,及是否添加Header等信息。sink同样的道理#配置sink1 agent1.sinks.sink1.type=hdfs agent1.sinks.sink1.hdfs.path=hdfs://master:8020/aboutyunlog agent1.sinks.sink1.hdfs.fileType=DataStream agent1.sinks.sink1.hdfs.writeFormat=TEXT agent1.sinks.sink1.hdfs.rollInterval=4 agent1.sinks.sink1.channel=channel1配置了sink1的类型,hdfs路径,file类型,格式,滚动时间,使用channel等。通过上面,我们或许就明白了,flume的各种配置。也能轻易读懂别人是如何配置的。flume与kafka整合flume与kafka整合应该是用的比较多的,而且这也是一个难点。这里只是简单说下。1.kafka作为数据源kafka作为数据源其实kafka消费者,从kafka topic读取消息。如果你有多个kafka数据源运行,你可以配置他们为同一个Consumer Group。它们只能读取topics的一个分区.这里只介绍下一些必须的属性,更多可参考官网属性名称:type值为 org.apache.flume.source.kafka.KafkaSourcekafka.bootstrap.servers值为kafka作为数据源的broker的列表。格式为host:端口例如localhost:9092kafka.consumer.group.id:这个不是必须的。默认为flume。kafka.topics:kafka消费者从topics 列表读取消息kafka.topics.regex:定义了一组topic.比 kafka.topics有更高的优先级.是对kafka.topics的重写。过时的属性topic,groupId,zookeeperConnect分别替换为kafka.topics,kafka.consumer.group.id,kafka.bootstrap.servers 链接 kafka cluster下面举两个例子多个topic的配置tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.channels = channel1 tier1.sources.source1.batchSize = 5000 tier1.sources.source1.batchDurationMillis = 2000 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics = test1, test2 tier1.sources.source1.kafka.consumer.group.id = custom.g.id使用正则订阅topictier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.channels = channel1 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$ # the default kafka.consumer.group.id=flume is used当然还有使用认证的订阅,大家可以参考官网http://flume.apache.org/FlumeUserGuide.html2.Kafka作为Sink也就是说flume将数据发送到kakfa的topic。支持的kafka的版本为Kafka 0.9.x,0.8.x不在支持。属性介绍type必须设置为org.apache.flume.sink.kafka.KafkaSinkkafka.bootstrap.servers:连接的broker 列表kafka.topic:默认default-flume-topic,发布数据所到的topic过时的属性brokerList替换为kafka.bootstrap.serverstopic替换为kafka.topicbatchSize替换为kafka.flumeBatchSizerequiredAcks替换为kafka.producer.acks配置例子a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.ki.kafka.producer.compression.type = snappy3.kafka作为channelevents存储在kafka集群,kafka提供高可用和副本,因此如果客户端或则kafka broker崩溃的话,可以立即使用其它sinks。Kafka channel可以用于多种场景。1.Flume source 和sink--为events提供可靠及高可用的channel 2.lume source 和过滤器,没有sink.--为其它应用程序,允许写flume events到kafka topic3. Flume sink, 和没有source--它是低延迟,容错的方式去发送events,从kafka到Flume sinks 比如HDFS, HBase or Solrflume整合需要的kafka的版本为 0.9 及以后版本。channel 配置发生了变化,比以前的flume 版本。flume兼容以前版本,但是过时的属性会有警告。属性type值为org.apache.flume.channel.kafka.KafkaChannelkafka.bootstrap.servers值为hostname:port,可以是brokers list的一部分。但是值得注意的至少两个,这样可以高可用。kafka.topic默认为flume-channel        过时的属性brokerList替换为hostname:porttopic替换为kafka.topicgroupId替换为 kafka.consumer.group.idreadSmallestOffset替换为  kafka.consumer.auto.offset.reset例子:a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092 a1.channels.channel1.kafka.topic = channel1 a1.channels.channel1.kafka.consumer.group.id = flume-consumer除了上面kafka与flume整合,很多人对于kafka和flume有些混淆,下面内容,贴出来分享给大家网上有一些好的内容,拿出来分享给大家Kafka 与 Flume 很多功能确实是重复的。以下是评估两个系统的一些建议:Kafka 是一个通用型系统。你可以有许多的生产者和消费者分享多个主题。相反地,Flume 被设计成特定用途的工作,特定地向 HDFS 和 HBase 发送出去。Flume 为了更好地为 HDFS 服务而做了特定的优化,并且与 Hadoop 的安全体系整合在了一起。基于这样的结论,Hadoop 开发商 Cloudera 推荐如果数据需要被多个应用程序消费的话,推荐使用 Kafka,如果数据只是面向 Hadoop 的,可以使用 Flume。Flume 拥有许多配置的来源 (sources) 和存储池 (sinks)。然后,Kafka 拥有的是非常小的生产者和消费者环境体系,Kafka 社区并不是非常支持这样。如果你的数据来源已经确定,不需要额外的编码,那你可以使用 Flume 提供的 sources 和 sinks,反之,如果你需要准备自己的生产者和消费者,那你需要使用 Kafka。Flume 可以在拦截器里面实时处理数据。这个特性对于过滤数据非常有用。Kafka 需要一个外部系统帮助处理数据。无论是 Kafka 或是 Flume,两个系统都可以保证不丢失数据。然后,Flume 不会复制事件。相应地,即使我们正在使用一个可以信赖的文件通道,如果 Flume agent 所在的这个节点宕机了,你会失去所有的事件访问能力直到你修复这个受损的节点。使用 Kafka 的管道特性不会有这样的问题。Flume 和 Kafka 可以一起工作的。如果你需要把流式数据从 Kafka 转移到 Hadoop,可以使用 Flume 代理 (agent),将 kafka 当作一个来源 (source),这样可以从 Kafka 读取数据到 Hadoop。你不需要去开发自己的消费者,你可以使用 Flume 与 Hadoop、HBase 相结合的特性,使用 Cloudera Manager 平台监控消费者,并且通过增加过滤器的方式处理数据。来自:Kafka与 Flume 的区别记录分享http://www.aboutyun.com/forum.php?mod=viewthread&tid=21563 Flume :管道 ----个人认为比较适合有多个生产者场景,或者有写入Hbase、HDFS和kafka需求的场景。    Kafka :消息队列-----由于Kafka是Pull模式,因此适合有多个消费者的场景。cnblogshttp://www.cnblogs.com/ibyte/p/5830715.html
文章
消息中间件  ·  存储  ·  分布式计算  ·  安全  ·  Hadoop  ·  Kafka  ·  分布式数据库  ·  Spark  ·  数据格式  ·  Hbase
2023-01-14
Kafka-HBase-MapReduce-Mysql 连接实践 通话记录
1.项目介绍本项目采用的数据为通话记录数据,例(张三 李四 2021-4-23 12:32:13 2942)意思是张三在2021-4-23 12:32:13这个时间给李四通话,通话时长为2942秒数据来源【程序自己模拟数据的产生,交给Kafka的生产者】Kafka的消费者端接的是HBase数据库MapReduce读取HBase中的数据进行分析再将分析的数据导入MySQL2.各类介绍Produce模块DataProduce:主要负责生产数据Main:函数的入口testAPI:进行功能测试KafkaUtils:将数据发送到topicConsumer模块Main:程序的入口HBaseConsumer:消费者拉取数据HBaseDao:HBase的客户端对象,创建表导入数据HBaseUtils:主要是创建RowKey,还有一些建表和命名空间的操作Analysis模块HashUtils:将每个Cell中的数据存入到HashMap中MysqlUtils:主要是Mysql的连接操作CountMap:计算每个用户之间的通话记录次数DBWrite:实现了Writable、DBWritable,用于序列化以及写数据库操作3.项目各模块项目分为三个模块,分别是produce、consumer、analysisproduce:实现数据生产consumer:Kafka将数据写入HBaseanalysis:利用MapReduce分析数据将结果导入Mysql2.1 produce2.1.1 entrypublic class Main { public static void main(String[] args) throws ParseException, InterruptedException { //生产数据,发到Kafka KafkaUtils.writeDataIntoKafka(); } }2.1.2 dataProducepublic String produce(String startTime, String endTime) throws ParseException { // 张三 李四 2021-2-3 13:43:25 1204 initMetaData(); //获得随机下标来获得拨打电话者 int callerIndex = (int) (Math.random() * telePhone.size()); String callerName = phoneToName.get(telePhone.get(callerIndex)); //获得被拔打电话者 int calleeIndex; do { calleeIndex = (int) (Math.random() * telePhone.size()); } while (callerIndex == calleeIndex); String calleeName = phoneToName.get(telePhone.get(calleeIndex)); //定义解析时间的对象 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); //定义起止时间 Date startDate = null; Date endDate = null; //解析传入的时间字符串,将其转化成Date格式 startDate = sdf.parse(startTime); endDate = sdf.parse(endTime); //获得一个时间戳,来初始打电话的时间 long randomTs = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random()); Date resultDate = new Date(randomTs); //将初始化好的Date时间,转化成字符串 String resultTimeString = sdf.format(resultDate); //随机初始化小时、分钟、秒 int hour = (int) (Math.random() * 24); int minute = (int) (Math.random() * 60); int second = (int) (Math.random() * 60); //初始化具体时间,精确到小时、分钟、秒 String specificTime = String.format(String.format("%02d", hour) + ":" + String.format("%02d", minute) + ":" + String.format("%02d", second)); //定义时间跨度,表明电话的拨打时长 int duration = (int) (Math.random() * 3600); //拼接结果 张三 李四 2021-2-3 13:43:25 1204 String result = callerName + " " + calleeName + " " + resultTimeString + " " + specificTime + " " + duration; return result; }2.1.3 kafkaUtilspublic static void writeDataIntoKafka() throws ParseException, InterruptedException { //定义配置对象 Properties properties = new Properties(); //定义主机名 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); //字符串序列化的类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //Kafka的主题 String topic = "telecom"; //定义一个生产者对象 KafkaProducer producer = new KafkaProducer<String, String>(properties); //循环发送数据到Kafka for (int i = 0; i < 1000; i++) { //按给定起止时间生成数据 String value = dataProduce.produce("2021-1-1", "2021-5-1"); //睡眠1秒 Thread.sleep(1000); //创建ProducerRecord对象 ProducerRecord<String, String> record = new ProducerRecord<>(topic, value); //发送数据 producer.send(record); } //关闭资源 producer.close(); }2.2 consumer2.2.1 entrypublic class Main { public static void main(String[] args) throws IOException, InterruptedException, ParseException { //创建HBase消费者 HBaseConsumer hBaseConsumer = new HBaseConsumer(); //从Kafka中获取数据输到HBase hBaseConsumer.getDataFromKafka(); } }2.2.2 hbase2.2.2.1 HBaseConsumerpublic class HBaseConsumer { public void getDataFromKafka() throws InterruptedException, IOException, ParseException { //定义配置对象 Properties properties = new Properties(); //连接主机名 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); //是否自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); //自动提交的时间间隔 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); //消费者组名 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test3"); //字符串序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //创建消费者对象 KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties); //消费者订阅主题 consumer.subscribe(Arrays.asList("telecom")); //创建一个Dao对象,用于上传数据到HBase HBaseDao hBaseDao = new HBaseDao(); //从Kafka拉取数据 while (true) { //拉取的时间间隔 ConsumerRecords<String,String> records = consumer.poll(100); //拉取数据输到HBase for (ConsumerRecord<String,String> record : records) { String value = record.value(); System.out.println(value); Thread.sleep(1000); //上传数据 hBaseDao.put(value); } } } }2.2.2.2 HBaseDaopublic class HBaseDao { //命名空间 private String nameSpace; //表名 private String tableName; //配置对象 public static Configuration conf; //表对象 private Table table; //连接HBase对象 private Connection connection; //解析日期对象 private SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd"); private SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMddHHmmss"); //初始化配置对象 static { conf = HBaseConfiguration.create(); } public HBaseDao() throws IOException { nameSpace = "telecom"; tableName = "teleRecord"; connection = ConnectionFactory.createConnection(conf); if (!HBaseUtils.isExistTable(conf, tableName)) { HBaseUtils.initNamespace(conf, nameSpace); HBaseUtils.createTable(conf, tableName, "f1", "f2"); } table = connection.getTable(TableName.valueOf(tableName)); } //将数据导入HBase public void put(String value) throws ParseException, IOException { //将Kafka拉取的数据切分 String[] splitValues = value.split(" "); String caller = splitValues[0]; String callee = splitValues[1]; String buildTime = splitValues[2]; String specificTime = splitValues[3]; String duration = splitValues[4]; //2021-03-21 12:23:04 buildTime = buildTime + " " + specificTime; //20210321122304 用于创建rowKey String buildTimeReplace = sdf2.format(sdf1.parse(buildTime)); //时间戳 String buildTimeTs = String.valueOf(sdf1.parse(buildTime).getTime()); //获得rowKey String rowKey = HBaseUtils.createRowKey(caller, callee, buildTimeReplace, "1", duration); //创建put对象 Put put = new Put(Bytes.toBytes(rowKey)); //添加各列属性 put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("caller"), Bytes.toBytes(caller)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callee"), Bytes.toBytes(callee)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time"), Bytes.toBytes(buildTime)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time_ts"), Bytes.toBytes(buildTimeTs)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("flag"), Bytes.toBytes("1")); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("duration"), Bytes.toBytes(duration)); //添加put table.put(put); } }2.2.3 hbaseUtilspublic class HBaseUtils { //判断表是否存在 public static boolean isExistTable(Configuration conf, String tableName) throws IOException { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); boolean result = admin.tableExists(TableName.valueOf(tableName)); admin.close(); connection.close(); return result; } //判断命名空间是否存在 public static boolean isExistTableSpace(Configuration conf, String nameSpace) throws IOException { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); boolean result = false; admin.close(); connection.close(); return result; } //创建命名空间 public static void initNamespace(Configuration conf, String nameSpace) throws IOException { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); try { NamespaceDescriptor descriptor = NamespaceDescriptor.create(nameSpace).build(); admin.createNamespace(descriptor); } catch (NamespaceExistException e) { } finally { admin.close(); connection.close(); } } //创建表 public static void createTable(Configuration conf, String tableName, String... cfs) throws IOException { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); for (String cf : cfs) { hTableDescriptor.addFamily(new HColumnDescriptor(cf)); } admin.createTable(hTableDescriptor); admin.close(); connection.close(); } //创建RowKey public static String createRowKey(String caller, String callee, String buildTime, String flag, String duration) { StringBuilder rowKey = new StringBuilder(); rowKey.append(caller + "_") .append(buildTime + "_") .append(callee + "_") .append(flag + "_") .append(duration); return rowKey.toString(); } }2.3 analysis2.3.1 hashUtilspublic class HashUtils { public static void putValue(Cell cell, HashMap<String, String> hashMap) { //获取cell中的列名 String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)); //获取每列的值 String value = Bytes.toString(CellUtil.cloneValue(cell)); //存入map hashMap.put(qualifier, value); } }2.3.2 mysqlUtilspublic class MysqlUtils { public static Connection connection; public static String userName = "root"; public static String passwd = "123456"; public static PreparedStatement ps = null; //获得Connection对象 static { try { Class.forName("com.mysql.cj.jdbc.Driver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } try { connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb" + "?useSSL=false" + "&allowPublicKeyRetrieval=true" + "&serverTimezone=UTC", userName, passwd); } catch (SQLException e) { e.printStackTrace(); } } //清空表数据 public static void deleteData(String tableName) throws SQLException { String sql = String.format("delete from %s", tableName); ps = connection.prepareStatement(sql); ps.executeUpdate(); } }2.3.3 hbaseToMR2.3.3.1 callCount2.3.3.1.1 Mappublic class CountMap extends TableMapper<Text, IntWritable> { //输出 张三 1 @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { HashMap<String, String> hashMap = new HashMap<>(); for (Cell cell : value.rawCells()) { HashUtils.putValue(cell, hashMap); } String caller = hashMap.get("caller"); String callee = hashMap.get("callee"); context.write(new Text(caller + "-" + callee), new IntWritable(1)); } }2.3.3.1.2 Reducepublic class CountReduce extends Reducer<Text, IntWritable, DBWrite, NullWritable> { //输出 张三 23 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count += 1; } context.write(new DBWrite(key.toString(), count), NullWritable.get()); } }2.3.3.1.3 Driverpublic class CountDriver implements Tool { //配置对象 public static Configuration conf = null; //Mysql数据库表名 public static String mysqlTableName = "callcounts"; //Mysql表中列名 public static String[] fieldNames = {"callercallee", "counts"}; //Mysql驱动类 public static String driverClass = "com.mysql.cj.jdbc.Driver"; //Mysql的URL public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" + "?useSSL=false" + "&allowPublicKeyRetrieval=true" + "&serverTimezone=UTC"; //Mysql的用户名 public static String userName = "root"; //Mysql的用户密码 public static String passwd = "123456"; @Override public int run(String[] strings) throws Exception { //配置Mysql DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd); //清空表 MysqlUtils.deleteData(mysqlTableName); //获得job对象 Job job = Job.getInstance(conf); //关联Jar job.setJarByClass(CountDriver.class); //配置MapperJob TableMapReduceUtil.initTableMapperJob("teleRecord", new Scan(), CountMap.class, Text.class, IntWritable.class, job); //关联Reduce类 job.setReducerClass(CountReduce.class); job.setOutputKeyClass(DBWrite.class); job.setOutputValueClass(NullWritable.class); //设置输出类型 job.setOutputFormatClass(DBOutputFormat.class); DBOutputFormat.setOutput(job, mysqlTableName, fieldNames); //提交job任务 boolean result = job.waitForCompletion(true); return result ? 0 : 1; } @Override public void setConf(Configuration configuration) { conf = configuration; conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104"); } @Override public Configuration getConf() { return conf; } public static void main(String[] args) { Configuration conf = new Configuration(); try { int run = ToolRunner.run(conf, new CountDriver(), args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }2.3.3.1.4 DBWritepublic class DBWrite implements Writable, DBWritable { String caller_callee = ""; int count = 0; public DBWrite(){} public DBWrite(String caller_callee, int count){ this.caller_callee=caller_callee; this.count=count; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(caller_callee); out.writeInt(count); } @Override public void readFields(DataInput in) throws IOException { this.caller_callee = in.readUTF(); this.count = in.readInt(); } @Override public void write(PreparedStatement preparedStatement) throws SQLException { preparedStatement.setString(1, caller_callee); preparedStatement.setInt(2, count); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.caller_callee = resultSet.getString(1); this.count = resultSet.getInt(2); } }2.3.3.2 callerDuration2.3.3.2.1 Mappublic class DurationMap extends TableMapper<Text, LongWritable> { //输出 张三 2041 @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //创建HashMap对象,为了下面取出对应值用 HashMap<String, String> hashMap = new HashMap<>(); //迭代rowkey对应的每个单元 for (Cell cell : value.rawCells()) { HashUtils.putValue(cell, hashMap); } //获得电话发起人 String caller = hashMap.get("caller"); //获得每次电话时长 String duration = hashMap.get("duration"); //写出 context.write(new Text(caller), new LongWritable(Long.valueOf(duration))); } }2.3.3.2.2 Reducepublic class DurationReduce extends Reducer<Text, LongWritable, DBWrite, NullWritable> { //输出 张三 4204 @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { //存储每个人拨打电话的总时长 long sum = 0; //迭代每个时长 for (LongWritable value : values) { sum += value.get(); } //将结果写出 context.write(new DBWrite(key.toString(), String.valueOf(sum)), NullWritable.get()); } }2.3.3.2.3 Driverpublic class DurationDriver implements Tool { //配置对象 public static Configuration conf = null; //Mysql数据库表名 public static String mysqlTableName = "callerdurations"; //Mysql表中列名 public static String[] fieldNames = {"caller", "durations"}; //Mysql驱动类 public static String driverClass = "com.mysql.cj.jdbc.Driver"; //Mysql的URL public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" + "?useSSL=false" + "&allowPublicKeyRetrieval=true" + "&serverTimezone=UTC"; //Mysql的用户名 public static String userName = "root"; //Mysql的用户密码 public static String passwd = "123456"; @Override public int run(String[] strings) throws Exception { //配置Mysql DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd); //清空表 MysqlUtils.deleteData(mysqlTableName); //获得job对象 Job job = Job.getInstance(conf); //关联Jar job.setJarByClass(DurationDriver.class); //配置MapperJob TableMapReduceUtil.initTableMapperJob("teleRecord", new Scan(), DurationMap.class, Text.class, LongWritable.class, job); //关联Reduce类 job.setReducerClass(DurationReduce.class); job.setOutputKeyClass(DBWrite.class); job.setOutputValueClass(NullWritable.class); //设置输出类型 job.setOutputFormatClass(DBOutputFormat.class); DBOutputFormat.setOutput(job, mysqlTableName, fieldNames); //提交job任务 boolean result = job.waitForCompletion(true); return result ? 0 : 1; } @Override public void setConf(Configuration configuration) { conf = configuration; conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104"); } @Override public Configuration getConf() { return conf; } public static void main(String[] args) { Configuration conf = new Configuration(); try { int run = ToolRunner.run(conf, new DurationDriver(), args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }2.3.3.3 dayCountDuration2.3.3.3.1 Mappublic class dayCountDurationMap extends TableMapper<Text, LongWritable> { //2021-01-13 3042 @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { HashMap<String, String> hashmap = new HashMap<>(); for (Cell cell : value.rawCells()) { HashUtils.putValue(cell, hashmap); } String date = hashmap.get("build_time").substring(0, 10); String duration = hashmap.get("duration"); context.write(new Text(date), new LongWritable(Long.valueOf(duration))); } }2.3.3.3.2 Reducepublic class dayCountDurationReduce extends Reducer<Text, LongWritable, DBWrite, NullWritable> { //输出 2021-01-13 2042 @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long durations = 0; for (LongWritable value : values) { durations += value.get(); } context.write(new DBWrite(key.toString(), durations), NullWritable.get()); } }2.3.3.3.3 Driverpublic class dayCountDurationDriver implements Tool { //配置对象 public static Configuration conf = null; //Mysql数据库表名 public static String mysqlTableName = "daydurations"; //Mysql表中列名 public static String[] fieldNames = {"date", "durations"}; //Mysql驱动类 public static String driverClass = "com.mysql.cj.jdbc.Driver"; //Mysql的URL public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" + "?useSSL=false" + "&allowPublicKeyRetrieval=true" + "&serverTimezone=UTC"; //Mysql的用户名 public static String userName = "root"; //Mysql的用户密码 public static String passwd = "123456"; @Override public int run(String[] strings) throws Exception { //配置Mysql DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd); //清空表 MysqlUtils.deleteData(mysqlTableName); //获得job对象 Job job = Job.getInstance(conf); //关联Jar job.setJarByClass(dayCountDurationDriver.class); //配置MapperJob TableMapReduceUtil.initTableMapperJob("teleRecord", new Scan(), dayCountDurationMap.class, Text.class, LongWritable.class, job); //关联Reduce类 job.setReducerClass(dayCountDurationReduce.class); job.setOutputKeyClass(DBWrite.class); job.setOutputValueClass(NullWritable.class); //设置输出类型 job.setOutputFormatClass(DBOutputFormat.class); DBOutputFormat.setOutput(job, mysqlTableName, fieldNames); //提交job任务 boolean result = job.waitForCompletion(true); return result ? 0 : 1; } @Override public void setConf(Configuration configuration) { conf = configuration; conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104"); } @Override public Configuration getConf() { return conf; } public static void main(String[] args) { Configuration conf = new Configuration(); try { int run = ToolRunner.run(conf, new dayCountDurationDriver(), args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }4 项目源码Github地址
文章
消息中间件  ·  分布式计算  ·  关系型数据库  ·  MySQL  ·  Kafka  ·  测试技术  ·  分布式数据库  ·  数据库  ·  Hbase
2023-01-17
1 2 3 4 5 6 7 8 9
...
20
跳转至:
数据库
252297 人关注 | 50729 讨论 | 94276 内容
+ 订阅
  • SpringSession的源码解析(从Cookie中读取Sessionid,根据sessionid查询信息全流程分析)
  • ORA-12705: Cannot access NLS data files or invalid environment
  • 【Spark学习笔记】- 初始 Apache Spark
查看更多 >
大数据
188010 人关注 | 29189 讨论 | 79997 内容
+ 订阅
  • 【Spark学习笔记】- 初始 Apache Spark
  • SpringSession的源码解析(生成session,保存session,写入cookie全流程分析)
  • 【Spark学习笔记】- 数据科学面临的挑战
查看更多 >
开发与运维
5603 人关注 | 131412 讨论 | 300421 内容
+ 订阅
  • ORA-12705: Cannot access NLS data files or invalid environment
  • 【Spark学习笔记】- 初始 Apache Spark
  • SpringSession的源码解析(生成session,保存session,写入cookie全流程分析)
查看更多 >
云计算
21776 人关注 | 59338 讨论 | 55519 内容
+ 订阅
  • 【前端面试知识题】- 性能优化
  • 【前端面试题】- 前端工程化的
  • 一寸宕机一寸血,十万容器十万兵|Win10/Mac系统下基于Kubernetes(k8s)搭建Gunicorn+Flask高可用Web集群
查看更多 >
微服务
22991 人关注 | 11310 讨论 | 33077 内容
+ 订阅
  • SpringSession的源码解析(生成session,保存session,写入cookie全流程分析)
  • linux进程通讯(IPC)--大总结梳理
  • Java高手速成 | Spring、JPA与Hibernate的整合
查看更多 >