hiekay个人页面-阿里云开发者社区

个人介绍

java 数据分析 数据可视化 大数据

擅长的技术

  • Java
获得更多能力
通用技术能力:

暂时未有相关通用技术能力~

云产品技术能力:

暂时未有相关云产品技术能力~

阿里云技能认证

详细说明
  • 高分内容
  • 最新动态
  • 文章
  • 问答
正在加载, 请稍后...
滑动查看更多

2021年01月

  • 01.22 15:12:09
    发表了文章 2021-01-22 15:12:09

    mac 安装tensorflow

    mac 安装tensorflow
  • 01.22 14:02:25
    发表了文章 2021-01-22 14:02:25

    解决mac python2 python3 pip pip3 的指向问题

    解决mac python2 python3 pip pip3 的指向问题

2020年11月

  • 11.01 19:05:51
    发表了文章 2020-11-01 19:05:51

    python实现双均线策略

    python实现双均线策略

2020年09月

  • 09.24 00:48:26
    发表了文章 2020-09-24 00:48:26

    wordpress 网站安装主题

    wordpress 网站安装主题
  • 09.18 23:34:05
    发表了文章 2020-09-18 23:34:05

    centos 7 搭建wordpress 网站详细教程

    centos 7 搭建wordpress 网站详细教程
正在加载, 请稍后...
滑动查看更多
  • 发表了文章 2021-01-22

    mac 安装tensorflow

  • 发表了文章 2021-01-22

    解决mac python2 python3 pip pip3 的指向问题

  • 发表了文章 2020-11-01

    python实现MACD均线择时策略

  • 发表了文章 2020-11-01

    python实现双均线策略

  • 发表了文章 2020-11-01

    量化投资(一)

  • 发表了文章 2020-09-24

    wordpress 网站安装主题

  • 发表了文章 2020-09-18

    centos 7 搭建wordpress 网站详细教程

  • 发表了文章 2019-06-25

    mac 中安装maven

  • 发表了文章 2019-05-07

    linux expect 自动交互脚本用法

  • 发表了文章 2019-04-12

    ubuntu apache2 配置安装ssl证书

  • 发表了文章 2019-04-11

    Ubuntu 安装php环境 lamp 搭建网站

  • 发表了文章 2019-03-12

    yaml 语言入门

  • 发表了文章 2019-03-08

    网站架构的逐步优化演变

  • 发表了文章 2019-03-06

    linux 本地终端 SSH 连接 gcp (Google Cloud Platform ) 配置教程

  • 发表了文章 2019-03-01

    系统架构 一致性问题 : 库存扣减

  • 发表了文章 2019-02-15

    linux sed 总结

  • 发表了文章 2019-02-15

    perl语言 入门

  • 发表了文章 2019-02-12

    linux 的awk命令

  • 发表了文章 2019-02-12

    磁盘阵列 关于Raid0,Raid1,Raid5,Raid10

  • 发表了文章 2019-02-02

    nginx 编译出现的问题ngx_murmurhash.o failed

正在加载, 请稍后...
滑动查看更多
  • 回答了问题 2019-07-17

    HBase +Spark 社区活动的直播会录下来吗?

    会的

    踩0 评论0
  • 回答了问题 2019-07-17

    Procedure BitzsetNode已经可以容纳多个Long了吧

    不可以吧

    踩0 评论0
  • 回答了问题 2019-07-17

    后续hbase会引入数据的强强一致性吗?

    以后可能会的

    踩0 评论0
  • 回答了问题 2019-07-17

    想学习hbase源码,建议从哪块入手呢?

    里面总共包括几个主要的工程吧:hbase-common,hbase-client,hbase-prefix-tree,hbase-server,test-hbase,hbase-compact*,hbase-thrift,hbase-protocol等工程。

    其中hbase-common是基础的,工程里面的jar包都放在它下面的lib目录,hbase-compact*那两个工程一直有问题,我在hbase-server下面的lib目录放了两个jar包,没引用这两个工程,就是放一些监控信息的,可以删掉。

    踩0 评论0
  • 回答了问题 2019-07-17

    hbase作为实时的储存数据库,用spark和fink怎么实现呢?

    前言
    之前因为仅仅是把HBase当成一个可横向扩展并且具有持久化能力的KV数据库,所以只用在了指标存储上,参看很早之前的一篇文章基于HBase做Storm 实时计算指标存储。这次将HBase用在了用户行为存储上,因为Rowkey的过滤功能也很不错,可以很方便的把按人或者内容的维度过滤出所有的行为。从某种意义上,HBase的是一个有且仅有一个多字段复合索引的存储引擎。

    虽然我比较推崇实时计算,不过补数据或者计算历史数据啥的,批处理还是少不了的。对于历史数据的计算,其实我是有两个选择的,一个是基于HBase的已经存储好的行为数据进行计算,或者基于Hive的原始数据进行计算,最终选择了前者,这就涉及到Spark(StreamingPro) 对HBase的批处理操作了。

    整合过程
    和Spark 整合,意味着最好能有Schema(Mapping),因为Dataframe 以及SQL API 都要求你有Schema。 遗憾的是HBase 有没有Schema取决于使用者和场景。通常SparkOnHBase的库都要求你定义一个Mapping(Schema),比如hortonworks的 SHC(https://github.com/hortonworks-spark/shc) 就要求你定义一个如下的配置:

    {
    "rowkey":"key",
    "table":{"namespace":"default", "name":"pi_user_log", "tableCoder":"PrimitiveType"},
    "columns":{"col0":{"cf":"rowkey", "col":"key", "type":"string"},
    "col1":{"cf":"f","col":"col1", "type":"string"}
    }
    }
    看上面的定义已经还是很容易看出来的。对HBase的一个列族和列取一个名字,这样就可以在Spark的DataSource API使用了,关于如何开发Spark DataSource API可以参考我的这篇文章利用 Spark DataSource API 实现Rest数据源中使用,SHC大体实现的就是这个API。现在你可以这么用了:

    val cat = "{n"rowkey":"key","table":{"namespace":"default", "name":"pi_user_log", "tableCoder":"PrimitiveType"},n"columns":{"col0":{"cf":"rowkey", "col":"key", "type":"string"},n"28360592":{"cf":"f","col":"28360592", "type":"string"}n}n}"

    val cc = sqlContext
      .read
      .options(Map(HBaseTableCatalog.tableCatalog -> cat))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .load()

    不过当你有成千上万个列,那么这个就无解了,你不大可能一一定义,而且很多时候使用者也不知道会有哪些列,列名甚至可能是一个时间戳。我们现在好几种情况都遇到了,所以都需要解决:

    自动获取HBase里所有的列形成Schema,这样就不需要用户配置了。
    规定HBase只有两个列,一个rowkey,一个 content,content 是一个map,包含所有以列族+列名为key,对应内容为value。
    先说说第二种方案(因为其实第一种方案也要依赖于第二种方案):

    {

        "name": "batch.sources",
        "params": [
          {
            "inputTableName": "log1",
            "format": "org.apache.spark.sql.execution.datasources.hbase.raw",
            "path": "-",
            "outputTable": "log1"
          }
        ]
      },
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "select rowkey,json_value_collect(content) as actionList from log1",
            "outputTableName":"finalTable"
          }
        ]
      },

    首先我们配置了一个HBase的表,叫log1,当然,这里是因为程序通过hbase-site.xml获得HBase的链接,所以配置上你看不到HBase相关的信息。接着呢,在SQL 里你就可以对content 做处理了。我这里是把content 转化成了JSON格式字符串。再之后你就可以自己写一个UDF函数之类的做处理了,从而实现你复杂的业务逻辑。我们其实每个字段里存储的都是JSON,所以我其实不关心列名,只要让我拿到所有的列就好。而上面的例子正好能够满足我这个需求了。

    而且实现这个HBase DataSource 也很简单,核心逻辑大体如下:

    case class HBaseRelation(

                          parameters: Map[String, String],
                          userSpecifiedschema: Option[StructType]
                        )(@transient val sqlContext: SQLContext)

    extends BaseRelation with TableScan with Logging {

    val hbaseConf = HBaseConfiguration.create()

    def buildScan(): RDD[Row] = {

    hbaseConf.set(TableInputFormat.INPUT_TABLE, parameters("inputTableName"))
    val hBaseRDD = sqlContext.sparkContext.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
      .map { line =>
        val rowKey = Bytes.toString(line._2.getRow)
    
        import net.liftweb.{json => SJSon}
        implicit val formats = SJSon.Serialization.formats(SJSon.NoTypeHints)
    
        val content = line._2.getMap.navigableKeySet().flatMap { f =>
          line._2.getFamilyMap(f).map { c =>
            (Bytes.toString(f) + ":" + Bytes.toString(c._1), Bytes.toString(c._2))
          }
        }.toMap
    
        val contentStr = SJSon.Serialization.write(content)
    
        Row.fromSeq(Seq(UTF8String.fromString(rowKey), UTF8String.fromString(contentStr)))
      }
    hBaseRDD

    }
    }
    那么我们回过头来,如何让Spark自动发现Schema呢?大体你还是需要过滤所有数据得到列的合集,然后形成Schema的,成本开销很大。我们也可以先将我们的数据转化为JSON格式,然后就可以利用Spark已经支持的JSON格式来自动推倒Schema的能力了。

    总体而言,其实并不太鼓励大家使用Spark 对HBase进行批处理,因为这很容易让HBase过载,比如内存溢出导致RegionServer 挂掉,最遗憾的地方是一旦RegionServer 挂掉了,会有一段时间读写不可用,而HBase 又很容易作为实时在线程序的存储,所以影响很大。

    踩0 评论0
  • 回答了问题 2019-07-17

    hbase的mttr怎么样,阿里云有特殊的优化么?

    Hbase 是一个always-available的服务,在机器故障的时候保持可用性,集群中的机器都运行regionserver daemons。但一个regionserver出现故障,或者机器掉线,那么保存在上面的regions也同样掉线。Hbase中MTTR的能够检测异常,尽可能早的恢复对掉线region的访问。
    文章解释了hbase如何管理MTTR,并且介绍了一些hbase和hdfs的设置。

    Hbase是一致性时对故障的如何保持弹性

    Hbase通过让一个单独的server负责数据的子集,也就是说,一个region在同一时刻只能被一个region server进行管理。

    Hbase对失败的弹性,归功于hdfs,因为在写数据的时候,数据会被复制到到不通的节点。
    Hbase将数据写入到hfiles中,hfile保存在hdfs上面,hdfs完成对hfiles的block的副本(replica)。默认情况下是副本数是3.
    Hbase使用commit log(或者称之为Write-Ahead-log,WAL),提交日志也同样写在HDFS上面,默认副本数为3.

    Hbase在故障检测以及访问恢复的步骤:
    识别宕机的节点(Identifying that a node is down):由于过载或者直接死掉,节点会不再响应。
    在write操作进行中的时候进行恢复(Recovering the writes in progress):通过读取commit log,恢复还没有被flush的edit。
    重新分配region:失败的regionserver之前在正在处理一些region,这些region需要被重新分配给其他的RS(regionSever),这个分配过程要根据每台RS不通的workload的情况
    那上面三个过程中哪一个步骤最痛苦?在检测以及恢复步骤发生的时候,客户端会被阻塞。MTTR的作用就是加速这个处理过程,让客户端对数据downtime的感知时间尽可能的短。

    检测失败节点:
    一个RS的失败原因很多:正常的情况下,可能是由于clean stop,比如管理员关闭了某个节点,这就允许RS能安全适当的关闭region,然后告诉HMaster,正在关闭。这种情况下,commit log会被清除,然后HMaster会立刻安排region的重新分配。
    另外的regionserver关闭的的情况:比如运行RS的计算机静默死亡(silent death),比如网络原因。导致region server不能够发出告警,这种情况有Zookeeper进行处理。 每一个RS都会连接到Zookeeper上面,而Master监测这些连接,Zookeeper自己管理heartbeat。所以timeout出现的时候,Hmaster会申明region server 已经死亡,启动一个恢复处理过程

    恢复正在进行中的写操作。
    当一个RS 出现了宕机,那么commit logs的恢复工作就发生了。这个恢复工作是并行开始的。
    第一步:随机的RS会提取commit logs(从设置好的commit log 目录中),并且按照region来分割日志成多个文件,保存在HDFS上面,然后region会被重新分配给其他任意的RS,
    然后每一个被分配的RS的都会去读取已经前面分割好的对应的region 日志文件,并且进行将该region恢复到正确状态。

    当出问题的是一个node 失败了,而不是一个进程崩溃了,那么问题就会出现了。
    出现进程崩溃的的regionserver,会将数据写入到处于同一台机上面的datanode上面。假设副本因素为3,那么当一个node丢失的时候,你丢失的不仅仅是一个region server,并且还有这个数据的一个副本。 进行split,就意味着要读取block,而1/3的副本已经死了,那么对于每一个block,你会有三分之一的几率被引导到错误的副本上面。还有,split操作需要创建新的文件。每一个文件都会有3个副本,这些副本可能会被指派给已经丢失的datanode,而这个write数据的过程由于datanode已经死亡,会在经过一个timeout之后失败,而重新回转到另外一个datanode上面,这就延缓了数据的恢复的过程。

    重新分配region
    分配region的工作会进行的很快,这依赖于Zookeeper,需要通过Zookeeper完成master和region server的异步工作。

    MTTR带来的提升(The MTTR improvements)
    检测失败节点:
    首先,可以减小默认的timeout时间。Hbase 被配置成3分钟的Zookeepertimeout时间,这就保证了GC不会介入进来(GC parse会导致ZK的timeout,会导致错误的failure检测。)
    对于生产系统,关注MTTR的话,设置timeout时间为1分钟,或者30秒,是很有必要的。
    一个合理的最小设置时20秒。所以你可以设置hbase.zookeeper.timeout=60000
    你同样设置你GC(incremental, generational GC with good figures for the young and old generations, etc., this is a topic by itself) ,这样使GC的暂停时间不要超过ZK timeout。

    恢复正在进行中的写操作。
    在正常情况下,会有足够的活跃的RS来并行的完成commit log files的split工作,所以问题的就转到能否直接找到HDFS上面仅存的副本。该问题的解决方案是,配置HDFS,是hdfs对于故障的检测快于hbase的检测。那就是说,如果hbase的timeout为60s,HDFS应该设置成20s(就是设置成20s之后就认为node已经死亡)。在这里我们要描述一下HDFS是如何处理dead node。HDFS的故障检测也同样是依赖于heartbeat和timeout,在HDFS中,如果一个node被申明为dead,那么保存在该datanode上面的replicas将会被复制到其他活跃的datanode上面去,而这个是一个消耗很大的过程,并且,如果多个datanode同事死亡,那么这就会引发“replication storms”,replication storms指所有的副本被重新拷贝,这会加剧系统负载,从而导致某些节点不响应,进而导致这些节点被NN视为已经死亡,而这些节点上面block又要重新被复制,周而复始,这样的replication storms实在是可怕了。
    因此,HDFS在开始恢复过程之前会等待一段时间,会比10分钟长一点。而这一点,对于低延时系统来说就是一个问题:访问dead datanode就会促发timeout。在HDFS versions 1.0.4 or 1.2, and branches 2 and 3中,引入一个新的状态:stale。当一个datanode不再发送hearbeat,并且这个时间持续到一个指定的时间,那么datanode处于stale状态。 处于stale状态的节点就是在读写过程中最后一个选择(a last resort for reads),所以启用这种性质,会是恢复更加快。
    设置启用stale的方法:修改hdfs-site.xml


    dfs.namenode.avoid.read.stale.datanode
    true


    dfs.namenode.avoid.write.stale.datanode
    true


    dfs.namenode.write.stale.datanode.ratio
    1.0f



    dfs.namenode.check.stale.datanode
    true

    注:我在cloudera cdh-4.1.2 提供的doc的配置项目文档中没有找到该参数,看了源码才找到这些参数。
    具体参考HDFS-3912, HDFS-4350:

    重新分配region
    Region分配的是纯粹hbase的内部实现,在Hbase 0.94+的版本中,region 分配过程的处理被优化了,允许在很短的时间异步分贝更多的region。
    具体可以参考[example - Apache jira HBASE-7247].

    结论。
    在Hbase中没有global failure,如果一个region server 失败了,其他的region server 仍然可用,对于给定的一个数据集,MTTR 通常是 10分钟左右。
    该经验数值是从是从比较普遍的情况是中得到的,因为需要使用dead datanode的节点上面副本数,恢复就需要时间。HDFS需要花费10分钟的时间来申明datanode死亡了。 当引入了stale状态的时候,这就不再是一个问题了。这个时候恢复时间就变成Hbase本身的问题,如果你需要考虑MTTR,那么你采用这里的设置,从节点真的出现失败,到数据在其他RS上面又重新变得可用,这个过程只需要2分钟,或者更少。

    踩0 评论0
  • 回答了问题 2019-07-17

    HBase2.X通过Netty来实现RPC 那内存泄漏是怎么做监控

    引用计数
    netty中使用引用计数机制来管理资源,当一个实现ReferenceCounted的对象实例化时,引用计数置1.
    客户代码中需要保持一个该对象的引用时需要调用接口的retain方法将计数增1.对象使用完毕时调用release将计数减1.
    当引用计数变为0时,对象将释放所持有的底层资源或将资源返回资源池.

    内存泄露
    按上述规则使用Direct和Pooled的ByteBuf尤其重要.对于DirectBuf,其内存不受VM垃圾回收控制只有在调用release导致计数为0时才会主动释放内存,而PooledByteBuf只有在release后才能被回收到池中以循环利用.
    如果客户代码没有按引用计数规则使用这两种对象,将会导致内存泄露.

    内存使用跟踪
    在netty.io.util包中含有如下两个类
    ResourceLeak 用于跟踪内存泄露
    ResourceLeakDetector 内存泄露检测工具
    在io.netty.buffer.AbstractByteBufAllocator类中有如下代码

    [java] view plaincopy在CODE上查看代码片派生到我的代码片

    //装饰器模式,用SimpleLeakAwareByteBuf或AdvancedLeakAwareByteBuf来包装原始的ByteBuf
    //两个包装类均通过调用ResourceLeak的record方法来记录ByteBuf的方法调用堆栈,区别在于后者比前者记录更多的内容
    protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {

    ResourceLeak leak;  
    //根据设置的Level来选择使用何种包装器  
    switch (ResourceLeakDetector.getLevel()) {  
        case SIMPLE:  
            //创建用于跟踪和表示内容泄露的ResourcLeak对象  
            leak = AbstractByteBuf.leakDetector.open(buf);  
            if (leak != null) {  
                //只在ByteBuf.order方法中调用ResourceLeak.record  
                buf = new SimpleLeakAwareByteBuf(buf, leak);  
            }  
            break;  
        case ADVANCED:  
        case PARANOID:  
            leak = AbstractByteBuf.leakDetector.open(buf);  
            if (leak != null) {  
                //在ByteBuf几乎所有方法中调用ResourceLeak.record    
                buf = new AdvancedLeakAwareByteBuf(buf, leak);  
            }  
            break;  
    }  
    return buf;  

    }

    下图展示了该方法被调用的时机.可见Netty只对PooledByteBuf和DirectByteBuf监控内存泄露.

    内存泄露检测

    下面观察上述代码中的AbstractByteBuf.leakDetector.open(buf);

    实现代码如下
    [java] view plaincopy在CODE上查看代码片派生到我的代码片

    //创建用于跟踪和表示内容泄露的ResourcLeak对象
    public ResourceLeak open(T obj) {

    Level level = ResourceLeakDetector.level;  
    if (level == Level.DISABLED) {//禁用内存跟踪  
        return null;  
    }  
    if (level.ordinal() < Level.PARANOID.ordinal()) {  
        //如果监控级别低于PARANOID,在一定的采样频率下报告内存泄露  
        if (leakCheckCnt ++ % samplingInterval == 0) {  
            reportLeak(level);  
            return new DefaultResourceLeak(obj);  
        } else {  
            return null;  
        }  
    } else {  
        //每次需要分配 ByteBuf 时,报告内存泄露情况  
        reportLeak(level);  
        return new DefaultResourceLeak(obj);  
    }  

    }
    其中reportLeak方法中完成对内存泄露的检测和报告,如下面代码所示.

    [java] view plaincopy在CODE上查看代码片派生到我的代码片

    private void reportLeak(Level level) {

    //......  
    
    // 报告生成了太多的活跃资源  
    int samplingInterval = level == Level.PARANOID? 1 : this.samplingInterval;  
    if (active * samplingInterval > maxActive && loggedTooManyActive.compareAndSet(false, true)) {  
        logger.error("LEAK: You are creating too many " + resourceType + " instances.  " +  
                resourceType + " is a shared resource that must be reused across the JVM," +  
                "so that only a few instances are created.");  
    }  
    
    // 检测并报告之前发生的内存泄露  
    for (;;) {  
        @SuppressWarnings("unchecked")  
        //检查引用队列(为什么通过检查该队列,可以判断是否存在内存泄露)  
        DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();  
        if (ref == null) {//队列为空,没有未报告的内存泄露或者从未发生内存泄露  
            break;  
        }  

    //清理引用

        ref.clear();  
    
        if (!ref.close()) {  
            continue;  
        }  
        //通过错误日志打印资源的方法调用记录,并将其保存在reportedLeaks中  
        String records = ref.toString();  
        if (reportedLeaks.putIfAbsent(records, Boolean.TRUE) == null) {  
            if (records.isEmpty()) {  
                logger.error("LEAK: {}.release() was not called before it's garbage-collected. " +  
                        "Enable advanced leak reporting to find out where the leak occurred. " +  
                        "To enable advanced leak reporting, " +  
                        "specify the JVM option '-D{}={}' or call {}.setLevel()",  
                        resourceType, PROP_LEVEL, Level.ADVANCED.name().toLowerCase(), simpleClassName(this));  
            } else {  
                logger.error(  
                        "LEAK: {}.release() was not called before it's garbage-collected.{}",  
                        resourceType, records);  
            }  
        }  
    }  

    }
    综合上面的三段代码,可以看出, Netty 在分配新 ByteBuf 时进行内存泄露检测和报告.
    DefaultResourceLeak的声明如下

    [java] view plaincopy在CODE上查看代码片派生到我的代码片

    private final class DefaultResourceLeak extends PhantomReference

    //......  
    
    public DefaultResourceLeak(Object referent) {  
        //使用一个静态的引用队列(refQueue)初始化  
        //refQueue是ResourceLeakDecetor的成员变量并由其初始化  
        super(referent, referent != null? refQueue : null);  
        //......  
    }  
    
    //......  

    }

    可见DefaultResourceLeak是个”虚”引用类型,有别于常见的普通的”强”引用,虚引用完全不影响目标对象的垃圾回收,但是会在目标对象被VM垃圾回收时被加入到引用队列中.
    在正常情况下ResourceLeak对象会所监控的资源的引用计数为0时被清理掉(不在被加入引用队列),所以一旦资源的引用计数失常,ResourceLeak对象会被加入到引用队列.例如没有成对调用ByteBuf的retain和relaease方法,导致ByteBuf没有被正常释放(对于DirectByteBuf没有及时释放内存,对于PooledByteBuf没有返回Pool),当引用队列中存在元素时意味着程序中有内存泄露发生.
    ResourceLeakDetector通过检查引用队列来判断是否有内存泄露,并报告跟踪情况.

    踩0 评论0
  • 回答了问题 2019-07-17

    每次hbase做region balance,服务端总是有几秒钟的不可用,怎么减少不可用的时间呢?

    首先,从客户端考虑,其实就是要保证Region下线不可服务期间,读写请求能够在集群恢复后继续,具体可以采取如下措施:

     1) 对于写端,可以将未写入成功的记录,添加到一个客户端缓存中,隔一段时间后交给一个后台线程统一重新提交一次;也可以通过 setAutoFlush(flase, false)保证提交失败的记录不被抛弃,留在客户端writeBuffer中等待下次writeBuffer满了后再次尝试提交,直到提交成功为止。

      2)对于读端,捕获异常后,可以采取休眠一段时间后进行重试等方式。

      3)当然,还可以根据实际情况合理调整hbase.client.retries.number和hbase.client.pause配置选项。

      然后,从服务端考虑,需要分别针对Region Split和Region Balance进行解决:

     1) 由于建表时,我们已经考虑到了数据在不同Region Server上的均匀分布,而且预先在不同Region Server上创建并分配了相同数目的Region,那么考虑到为了集群能够在实际线上环境下提供稳定的服务,可以选择关掉HBase的Region自动 Balance功能,当然关掉后可以选择在每天读写压力小的时候(如凌晨后)触发执行一次Balance操作即可。

      2)接下 来,Region总是被创建,不能被复用的问题该如何解决呢?根本原因是rowkey中包含了timestamp字段,而每时每刻timestamp总是 向上增长的。但是,使用方确实需要能够根据timestamp字段进行顺序scan操作,因此,timestamp字段必须保留。据此,这里给出两种解决 思路:

      一种常用方法是将表按照时间分表,例如按天进行分表,这样可以通过预先建表创建好Region分区,避免实 际读写过程中频 繁触发Region Split等过程,但是这一方法的缺点是每天需要预先建好表,而这一DDL过程可能出现问题进而导致读写出现问题,同时跨天时读写端也需要做出适应,调整 为读写新创建的表。

      其实,我们可以换一种思路,通过修改表的rowkey结构,将timestamp字段改成一 个周期循环的 timestamp,如取timestamp % TS_MODE后的值,其中TS_MODE须大于等于表的TTL时间周期,这样才能保证数据不会被覆盖掉。经过这样改造后,即可实现Region的复用, 避免Region的无限上涨。对于读写端的变更也较小,读写端操作时只需将timestamp字段取模后作为rowkey进行读写,另外,读端需要考虑能 适应scan扫描时处理[startTsMode, endTsMode]和[endTsMode, startTsMode]两种情况。

    踩0 评论0
  • 回答了问题 2019-07-17

    Hbase 在大规模用户画像标签,标签有近百个左右,适合吗?

    可以啊 , 百个维度 算少的 维度了

    踩0 评论0
  • 回答了问题 2019-07-17

    HBase 3.0预计什么时候发版呢,有roadmap吗?

    期待吧

    踩0 评论0
  • 回答了问题 2019-07-17

    hbase 单行列过多数据过大,做get时导致fullGC,有什么好的获取方式吗?

    将任务 分批 , 批量 get API

    踩0 评论0
  • 回答了问题 2019-07-17

    一个客户端 同时访问多个 HBase 集群,怎么实现?

    ssh 配置

    踩0 评论0
  • 回答了问题 2019-07-17

    hbase的数据做分析怎么处理呢?

    Spark读取HBase数据分析 并将结果存入HBase分析结果表 hbase只是数据

    踩0 评论0
  • 回答了问题 2019-07-17

    集群间Hbase数据同步哪种方式最好?

    一.准备阶段

    1.准备2套能正常运行的hbase集群(new cluster:222|oldcluster:226)

    2.2套集群的hosts文件内容都需要包含对方的主机地址

    3.zookeeper可以单独部署2个集群,也可用一个zookeeper集群管理2套hbase集群,就是不能用hbase自带的zookeeper集群做管理

    4.hadoop、hbase等组件版本号保持一致

    二.配置阶段

    1.修改222集群的hbase-site.xml文件

    添加如下内容:

    hbase.replication

    true

    2.在222集群上添加peer

    ./hbase shell add_peer'1','172.16.205.226:2181:/hbase' 执行该命令会报错,但是不影响执行结果,如果不想让其有报错提示,可进入zookeeper将peerid删除,再执行此命令就行了

    3.启动复制 ./hbase shellstart_replication 执行该命令也会报错,不予理会。若想看看状态是否被开启,同样进入zookeeper查看state

    4.创建表 在两套集群创建同样的表(结构需要完全一样)

    5.在226上添加replication属性,并刷新其结构

    disable 'your_table'

    alter 'your_table', {NAME =>'family_name', REPLICATION_SCOPE => '1'}

    enable 'your_table'

    6.测试数据同步

    在226上put一条数据进hbase

    222上将能在随后被scan到

    踩0 评论0
  • 回答了问题 2019-07-17

    oltp适合用hbase么?

    Phoenix

    踩0 评论0
  • 回答了问题 2019-07-17

    [@项籍][¥20]大数据怎么与spring结合

    第一步:MAVEN配置


       
    org.apache.spark
    spark-core_2.11
    1.6.0


    org.apache.spark
    spark-mllib_2.11
    1.6.0


      org.apache.spark
      spark-sql_2.11
       1.6.0


    org.scala-lang
    scala-library
    2.11.0


    org.scala-lang
    scala-compiler
    2.11.0


    org.scala-lang
    scala-reflect
    2.11.0

    第二步:Spring配置


               
                         


               


               


    class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">

     
           
             classpath:jdbc.properties  
             classpath:spark.properties  
             
         
        

    第三步:新增属性文件  spark.properties

    spark.master=local
    spark.url=jdbc:mysql://192.168.0.202:3306/spark?useUnicode=true&characterEncoding=UTF-8
    spark.table=testtable
    spark.username=root
    spark.password=mysql 

    第四步:写代码

    /**
     * 
     */
    package com.harleycorp.service.impl;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;

    import javax.annotation.Resource;

    import org.apache.log4j.Logger;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.SaveMode;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;

    import com.harleycorp.pojo.SparkUser;
    import com.harleycorp.service.ISparkUpperService;

    /**
     * @author kevin
     *
     */
    @Service
    public class SparkUpperServiceImpl implements ISparkUpperService {

    private Logger logger =Logger.getLogger(SparkUpperServiceImpl.class);

    @Value("${spark.master}")
    public String master ; // = "local"

    @Value("${spark.url}")
    public String url ;//= "jdbc:mysql://192.168.0.202:3306/spark?useUnicode=true&characterEncoding=UTF-8";

    @Value("${spark.table}")
    public String table ; //= "testtable"

    @Value("${spark.username}")
    public String username ;// = "root";

    //@Value("${spark.password}")
    public String password = "mysql";

    @Resource
    public SQLContext sqlContext;

    @Resource
    public JavaSparkContext sc;

    public Properties getConnectionProperties(){
    Properties connectionProperties = new Properties();
    connectionProperties.setProperty("dbtable",table);
    connectionProperties.setProperty("user",username);//数据库用户
    connectionProperties.setProperty("password",password); //数据库用户密码
    return connectionProperties;
    }

    public String query() {
    logger.info("=======================this url:"+this.url);
    logger.info("=======================this table:"+this.table);
    logger.info("=======================this master:"+this.master);
    logger.info("=======================this username:"+this.username);
    logger.info("=======================this password:"+this.password);

    DataFrame df = null;
    //以下数据库连接内容请使用实际配置地址代替
    df = sqlContext.read().jdbc(url,table, getConnectionProperties());
    df.registerTempTable(table);
    String result = sqlContext.sql("select * from testtable").javaRDD().collect().toString();
    logger.info("=====================spark mysql:"+result);
    return result;
    }

    public String queryByCon(){
    logger.info("=======================this url:"+this.url);
    logger.info("=======================this table:"+this.table);
    logger.info("=======================this master:"+this.master);
    logger.info("=======================this username:"+this.username);
    logger.info("=======================this password:"+this.password);

    DataFrame df = sqlContext.read().jdbc(url, table, new String[]{"password=000000"}, getConnectionProperties());
    String result = df.collectAsList().toString();
    logger.info("=====================spark mysql:"+result);
    return null;
    }

    public void add(){
    List list = new ArrayList();
    SparkUser us = new SparkUser();
    us.setUsername("kevin");
    us.setPassword("000000");
    list.add(us);
    SparkUser us2 = new SparkUser();
    us2.setUsername("Lisa");
    us2.setPassword("666666");
    list.add(us2);

    JavaRDD personsRDD = sc.parallelize(list);
    DataFrame userDf = sqlContext.createDataFrame(personsRDD, SparkUser.class);
    userDf.write().mode(SaveMode.Append).jdbc(url, table, getConnectionProperties());
    }

    }

    第五步:junit调用

    package com.harleycorp.testmybatis;

    import javax.annotation.Resource;

    import org.apache.log4j.Logger;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

    import com.harleycorp.service.ISparkUpperService;

    @RunWith(SpringJUnit4ClassRunner.class) //表示继承了SpringJUnit4ClassRunner类
    @ContextConfiguration(locations = {"classpath:spring-mybatis.xml"})

    public class TestSpark {

    private static Logger logger=Logger.getLogger(TestSpark.class);
    @Resource 
    private  ISparkUpperService sparkUpperService = null;

    @Test
    public void test1(){
    sparkUpperService.query();
    }

    @Test
    public void test2(){  
    sparkUpperService.add();   
    }

    @Test
    public void test3(){
    sparkUpperService.queryByCon();
    }
    }

    第六步:运行

    踩0 评论0
  • 回答了问题 2019-07-17

    Apache安装失败

    权限呢

    踩0 评论0
  • 回答了问题 2019-07-17

    谁会配置服务器

    在哪里?配置

    踩0 评论0
  • 回答了问题 2019-07-17

    Apache安装失败

    你应该说清楚 环境, 版本 ,问题 详细说清楚

    踩0 评论0
  • 回答了问题 2019-07-17

    [@项籍][¥20]大数据怎么与spring结合

    springX

    踩0 评论0
正在加载, 请稍后...
滑动查看更多