hbase的啥子日子问题

简介: hbase的啥子日子问题

33.1.png

这个是最开始的数据:乱七八糟的,要取出其中的一些,类似这些

33.2.png

其中毫秒级的时间数据要转为时间戳

33.3.png

spark先过滤出要取的数据

package sparkj;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class sp {
  public static void main(String[] args) { 
  // TODO Auto-generated method stub
  SparkConf sparkConf = new SparkConf().setAppName("PeopleInfoCalculator").setMaster("local[2]");
     JavaSparkContext sc = new JavaSparkContext(sparkConf);
     JavaRDD<String> filedata=sc.textFile("file:///root/555.txt");
          JavaRDD<String> data2=filedata.filter(f->f.matches("[0-9].*"))
            .map(f->String2Date(f.split(" ")[0],f.split(" ")[1])+" "+f.split(" ")[2]+" "+f.split(" ")[3].split(":")[0]);
     data2.foreach(f->System.err.println(f));
     //data2.saveAsTextFile("file:///root/555copy.txt");
//     JavaPairRDD<String,Integer> rdd6=filedata.mapToPair(f->new Tuple2<>(f.split(",")[1],1));
//     JavaPairRDD<String,Integer> rdd7=rdd6.reduceByKey((x,y)->x+y);
     //rdd7.foreach(f->System.err.println(f));
  }
  public static long String2Date(String date,String time) throws Exception{
  String newdate = date + " " + time;
  SimpleDateFormat s = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
  return s.parse(newdate).getTime();
  }
}

结果如下:

33.4.png

打开hbase,对hbase进行操作了,先start...,

root@master:/opt/hbase-1.3.3/bin# ./start-hbase.sh
starting master, logging to /opt/hbase-1.3.3/bin/../logs/hbase-root-master-master.out
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
root@master:/opt/hbase-1.3.3/bin# ./hbase shell
2022-05-05 07:21:22,172 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.3.3, ra75a458e0c8f3c60db57f30ec4b06a606c9027b4, Fri Dec 14 16:02:53 PST 2018
hbase(main):001:0> list
TABLE

         

hbase单词计数

单独的hbase写法

主函数:

package test1;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;
public class MainHB1  {
    private static String getType(Object a) {
        return a.getClass().toString();
    }
  public static void main(String[] args) throws Exception {
//           try {
//    HBaseOp ho = new HBaseOp();
//    String tab = "logData1";
//    String[] name = new String[1];
//    name[0] = "log";
//    ho.createTable(tab, name);
//    System.out.println("ok");
//    } catch (Exception e) {
//    e.printStackTrace();
//  }
           try {
      HBaseOp ho = new HBaseOp();
      String tab = "logData1";
      String[] name = new String[1];
      name[0] = "log";
      //ho.filterByRowFilter(tab, "class","info");
      //ho.filterBySingleColumnValueFilter(tab, "log","level","");
      System.out.println("ERROR一共是");
      ho.filterBySingleColumnValueFilter(tab, "log","level","ERROR");
      System.out.println("FATAL一共是");
      ho.filterBySingleColumnValueFilter(tab, "log","level","FATAL");
      System.out.println("ok");
      } catch (Exception e) {
      e.printStackTrace();
      }
    File file = new File("/root/555copy.txt"); 
    System.out.println("插入数据成功");
    System.out.println(" \\n" + 
      "\\\n" + 
              "");
            BufferedReader br = new BufferedReader(new FileReader(file));
            String s;
            String[] column = new String[2];
            column[0] = "level";
            column[1] = "class";
            int i = 0;
            while ((s = br.readLine()) != null) {
              String line[] = s.split(" ");
              long t = Long.parseLong(line[0]);
              String time = String.valueOf(Long.MAX_VALUE-t);
              try {
            HBaseOp ho = new HBaseOp();
            String tab = "logData1";
            String name = "log";
//            ho.put(tab,time, name,column[0], line[1]);
//            ho.put(tab,time, name,column[1], line[2]);
              } catch (Exception e) {
            e.printStackTrace();
          }
            }   
            System.out.println(i);
            System.out.println("ok");
  }
}

类的方法

package test1;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
//import org.apache.hadoop.hbase.filter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseOp {
  class P{
  String row;
  String[] colum;
  }
  // TODO Auto-generated method stub
Configuration conf=HBaseConfiguration.create();
public void createTable(String strTab,String[] arrCf) throws Exception{
  HBaseAdmin admin=new HBaseAdmin(conf);
  if(admin.tableExists(strTab)) {
  System.out.println("Table"+strTab+"exists.");
  }
  else {
  HTableDescriptor fs=new HTableDescriptor(strTab);
  for(String cf:arrCf) {
    HColumnDescriptor ff=new HColumnDescriptor(cf);
    ff.setTimeToLive(10000);
    fs.addFamily(ff);
  }
  admin.createTable(fs);
  }
  admin.close();
  }
public void deleteTable(String strTab)throws Exception
{
    HBaseAdmin admin = new HBaseAdmin(conf);
        if (!admin.tableExists(strTab)) {//判断表是否存在
            System.out.println(strTab + "不存在");
        }
        else if(admin.isTableEnabled(strTab)) {//如果表处于disable
          admin.disableTable(strTab);
             admin.deleteTable(strTab);
             System.out.println(strTab + " deleted");
        }
        else
        {
            admin.deleteTable(strTab);
            System.out.println(strTab + " deleted");
        }
        admin.close();
}
//
public  void mulPut() throws Exception {
  HTable table = new HTable(conf,"scores");
  // 创建一个列表用于存放Put实例
  List<Put> puts = new ArrayList<Put>();
  // 将第一个Put实例添加到列表
  Put put1 = new Put(Bytes.toBytes("Tom"));
  put1.add(Bytes.toBytes("grade"), Bytes.toBytes(""), Bytes.toBytes("1"));
  put1.add(Bytes.toBytes("course"), Bytes.toBytes("math"), Bytes.toBytes("990"));
  puts.add(put1);
  // 将第2个Put实例添加到列表
  Put put2 = new Put(Bytes.toBytes("John"));
  put2.add(Bytes.toBytes("grade"), Bytes.toBytes(""), Bytes.toBytes("2"));
  put2.add(Bytes.toBytes("course"), Bytes.toBytes("Chinese"), Bytes.toBytes("99"));
  puts.add(put2);
  // 将第3个Put实例添加到列表
  Put put3 = new Put(Bytes.toBytes("gyy"));
  put3.add(Bytes.toBytes("grade"), Bytes.toBytes(""), Bytes.toBytes("12"));
  put3.add(Bytes.toBytes("course"), Bytes.toBytes("math"), Bytes.toBytes("1000"));
  put3.add(Bytes.toBytes("course"), Bytes.toBytes("geo"), Bytes.toBytes("1000"));
  put3.add(Bytes.toBytes("course"), Bytes.toBytes("语文"), Bytes.toBytes("1000"));
  // 向HBase中存入多行多列数据
  puts.add(put3);
  table.put(puts);
  table.close();
  }
//
public void put(String tablename,String row,String cf,String column,String data) throws Exception{
  HTable table=new HTable(conf,tablename);
  Put p=new Put(Bytes.toBytes(row));
  p.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(data));
  table.put(p);
  table.close();
}
public void filterBySingleColumnValueFilter(String tablename, String strF, String strC, String strClass) throws Exception
{
    HTable table = new HTable(conf, tablename);
    Scan s = new Scan();
    SingleColumnValueFilter sf = 
      new SingleColumnValueFilter(Bytes.toBytes(strF), 
        Bytes.toBytes(strC), CompareOp.EQUAL, 
        Bytes.toBytes(strClass));
    s.setFilter(sf);
    ResultScanner rs = table.getScanner(s);
    HashMap<String, Integer> hashMap = new HashMap<>();
    int i=0;
    for (Result r : rs) {
      i=i+1;
        byte[] row = r.getRow();
        byte[] value = r.getValue(Bytes.toBytes(strF), Bytes.toBytes(strC));
        byte[] value2 = r.getValue(Bytes.toBytes(strF), Bytes.toBytes("class"));
        if(!hashMap.containsKey(Bytes.toString(value2))) {
      //集合中没有该单词,值定义为1  
        hashMap.put(Bytes.toString(value2), 1);
        }else if(hashMap.containsKey(Bytes.toString(value2))) {
      //集合中有该单词,值+1;key不变;  
        int b=hashMap.get(Bytes.toString(value2));
        hashMap.put(Bytes.toString(value2), b+1);
        }
//        System.out.println("Filter: " + Bytes.toString(row) + " is in " + strC + " " + Bytes.toString(value)+", class " + Bytes.toString(value2));
    }
    System.out.println(hashMap);
//    System.out.println( " ERROR一共是="+i);
    System.out.println("出现最多的类是:"+getProcessCdByName(hashMap));
    System.out.println("  FATAL一共是="+i);
    rs.close();
    table.close();
}
public static String getProcessCdByName(HashMap<String, Integer> processMap){//我找最大值对应的哪一个键
  int max=0;
  for (Integer in : processMap.values()) {
  System.err.println(in);
  max=Math.max(max, in);
  }
  String result = null;
  Set<Map.Entry<String, Integer>> set = processMap.entrySet();
  for(Map.Entry<String, Integer> entry : set){
  if(entry.getValue()==max){
    result = entry.getKey();
    break;
  }
  }
  return result;
}
public void filterBySingleColumnValueFilter(String tablename, String cf, String C) throws Exception
{
    HTable table = new HTable(conf, tablename);
    Scan s = new Scan();
    SingleColumnValueFilter sf = 
      new SingleColumnValueFilter(Bytes.toBytes(cf), 
        Bytes.toBytes(C), CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("ERROR")));
    SingleColumnValueFilter sf2 = 
      new SingleColumnValueFilter(Bytes.toBytes(cf), 
        Bytes.toBytes(C), CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("FATAL")));
    FilterList lst= new FilterList(
      FilterList.Operator.MUST_PASS_ONE);
    lst.addFilter(sf);
    lst.addFilter(sf2);
    s.setFilter(lst);
    ResultScanner rs = table.getScanner(s);
       int i =0;
    for (Result r : rs) {
        byte[] row = r.getRow();
        byte[] value = r.getValue(Bytes.toBytes(cf), Bytes.toBytes(C));
        byte[] value2 = r.getValue(Bytes.toBytes(cf), Bytes.toBytes("class"));
        System.out.println("--------------------------");
        System.out.println("Filter: " + Bytes.toString(row) + " is in " + C + " " + Bytes.toString(value)+", class " + Bytes.toString(value2));
        i=i+1;
    }
    System.out.println(i);
    rs.close();
    table.close();
}
//public void filterBySingleColumnValueFilter(String tablename, String strF, String strC, String strClass) throws Exception
//{
//    HTable table = new HTable(conf, tablename);
//    Scan s = new Scan();
//    SingleColumnValueFilter sf = 
//      new SingleColumnValueFilter(Bytes.toBytes(strF), 
//        Bytes.toBytes(strC), CompareOp.EQUAL, 
//        Bytes.toBytes(strClass));
//    s.setFilter(sf);
//    
//    ResultScanner rs = table.getScanner(s);
//
//    for (Result r : rs) {
//        byte[] row = r.getRow();
//        byte[] value = r.getValue(Bytes.toBytes(strF), Bytes.toBytes(strC));
//        byte[] value2 = r.getValue(Bytes.toBytes(strF), Bytes.toBytes("class"));
//        System.out.println("Filter: " + Bytes.toString(row) + " is in " + strC + " " + Bytes.toString(value)+", class " + Bytes.toString(value2));
//    }
//    rs.close();
//    table.close();
//}
public void filterBySingleColumnValueFilter(String tablename, String strF, String strC, String strClass) throws Exception
{
    HTable table = new HTable(conf, tablename);
    Scan s = new Scan();
    SingleColumnValueFilter sf = 
      new SingleColumnValueFilter(Bytes.toBytes(strF), 
        Bytes.toBytes(strC), CompareOp.EQUAL, 
        Bytes.toBytes(strClass));
    s.setFilter(sf);
    ResultScanner rs = table.getScanner(s);
    HashMap<String, Integer> hashMap = new HashMap<>();
    int i=0;
    for (Result r : rs) {
      i=i+1;
        byte[] row = r.getRow();
        byte[] value = r.getValue(Bytes.toBytes(strF), Bytes.toBytes(strC));
        byte[] value2 = r.getValue(Bytes.toBytes(strF), Bytes.toBytes("class"));
        if(!hashMap.containsKey(Bytes.toString(value2))) {
      //集合中没有该单词,值定义为1  
        hashMap.put(Bytes.toString(value2), 1);
        }else if(hashMap.containsKey(Bytes.toString(value2))) {
      //集合中有该单词,值+1;key不变;  
        int b=hashMap.get(Bytes.toString(value2));
        hashMap.put(Bytes.toString(value2), b+1);
        }
//        System.out.println("Filter: " + Bytes.toString(row) + " is in " + strC + " " + Bytes.toString(value)+", class " + Bytes.toString(value2));
    }
    System.out.println(hashMap);
    System.out.println("i="+i);
    rs.close();
    table.close();
}
public void get(String tablename,String row,String info,String name) throws Exception{
  HTable table=new HTable(conf,tablename);
  Get g=new Get(Bytes.toBytes(row));
  Result result = table.get(g);
  byte[] val = result.getValue(Bytes.toBytes(info),Bytes.toBytes(name));
  System.out.println(info+" "+name+" "+"Values =" + Bytes.toString(val));
}
public void scan(String tablename,String cf,String column) throws Exception{
  HTable table=new HTable(conf,tablename);
  Scan s=new Scan();
  s.setStartRow(Bytes.toBytes("0"));
     s.setStopRow(Bytes.toBytes("g"));
    // s.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
  ResultScanner res=table.getScanner(s);
  for(Result r:res) {
   byte[] row=r.getRow();
   byte[] val=r.getValue(Bytes.toBytes(cf), Bytes.toBytes(column));
   System.out.println("Scan:"+Bytes.toString(row)
   +"   values is "+Bytes.toString(val));
  }
  res.close();
  table.close();
}
public void delete(String tablename,String row,String cf,String column,String data) throws Exception{
  HTable table=new HTable(conf,tablename);
  List<Delete> ls=new ArrayList<Delete>();
  Delete p=new Delete(Bytes.toBytes(row));
  ls.add(p);
  //p.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(data));
  table.delete(ls);
  table.close();
}
}

插进表里面这种数据貌似是对的

33.5.png

33.6.png

或者直接用spark简单粗暴过滤这个,这个是这一句直接来的

先hbase主函数这个:

ho.filterBySingleColumnValueFilter(tab, "log","level","ERROR");

33.7.png


会有一个数据产生,把数据拷贝到一个文件里面,再spark单词计数哈哈哈

方法类这样:
public void filterBySingleColumnValueFilter1(String tablename, String strF, String strC, String strClass) throws Exception
{
    HTable table = new HTable(conf, tablename);
    Scan s = new Scan();
    SingleColumnValueFilter sf = 
            new SingleColumnValueFilter(Bytes.toBytes(strF), 
                    Bytes.toBytes(strC), CompareOp.EQUAL, 
                    Bytes.toBytes(strClass));
    s.setFilter(sf);
    ResultScanner rs = table.getScanner(s);
    for (Result r : rs) {
        byte[] row = r.getRow();
        byte[] value = r.getValue(Bytes.toBytes(strF), Bytes.toBytes(strC));
        byte[] value2 = r.getValue(Bytes.toBytes(strF), Bytes.toBytes("class"));
        System.out.println("Filter: " + Bytes.toString(row) + " is in " + strC + " " + Bytes.toString(value)+", class " + Bytes.toString(value2));
    }
    rs.close();
    table.close();
}

33.8.png

package sparkj;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class sp {
  public static void main(String[] args) { 
  // TODO Auto-generated method stub
  SparkConf sparkConf = new SparkConf().setAppName("PeopleInfoCalculator").setMaster("local[2]");
     JavaSparkContext sc = new JavaSparkContext(sparkConf);
     JavaRDD<String> filedata=sc.textFile("file:///root/555.txt");
//          JavaRDD<String> data2=filedata.filter(f->f.matches("[0-9].*"))
//            .map(f->String2Date(f.split(" ")[0],f.split(" ")[1])+" "+f.split(" ")[2]+" "+f.split(" ")[3].split(":")[0]);
//          
//     data2.foreach(f->System.err.println(f));
//     data2.saveAsTextFile("file:///root/555copy.txt");
     JavaPairRDD<String,Integer> rdd6=filedata.mapToPair(f->new Tuple2<>(f.split(",")[1],1));
     JavaPairRDD<String,Integer> rdd7=rdd6.reduceByKey((x,y)->x+y);
     rdd7.foreach(f->System.err.println(f));
  }
  public static long String2Date(String date,String time) throws Exception{
  String newdate = date + " " + time;
  SimpleDateFormat s = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
  return s.parse(newdate).getTime();
  }
}

最后结果

33.9.png

相关实践学习
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
8月前
|
SQL 分布式计算 Hadoop
Hadoop Hive面试连环炮 1
Hadoop Hive面试连环炮
37 0
|
SQL 消息中间件 分布式计算
Flink 面试指南 | 终于要跟大家见面了,我有点紧张。(附思维导图)
面试,一个令人大多数同学头疼的问题,要么成功进入心仪公司,要么沮丧与其失之交臂。但是,如果能在面试前就能知道面试官将会问的问题,然后可以好好提前准备,这种感觉是不是特别棒?
Flink 面试指南 | 终于要跟大家见面了,我有点紧张。(附思维导图)
|
8月前
|
存储 NoSQL 分布式数据库
HBase面试连环炮
HBase面试连环炮
38 0
|
8月前
|
SQL 分布式计算 Hadoop
Hadoop Hive面试连环炮 2
Hadoop Hive面试连环炮
33 0
|
8月前
|
机器学习/深度学习 分布式计算 资源调度
Hadoop面试连环炮 1
Hadoop面试连环炮
130 0
|
8月前
|
存储 分布式计算 资源调度
Hadoop面试连环炮 2
Hadoop面试连环炮
57 0
|
8月前
|
消息中间件 存储 NoSQL
Kafka面试连环炮
Kafka面试连环炮
74 0
|
SQL 大数据 数据库
Hive报错记录——林子雨教材
Hive报错记录——林子雨教材
107 0
Hive报错记录——林子雨教材
|
分布式计算 分布式数据库 Hbase
当HBase遇上MapReduce头歌答案
当HBase遇上MapReduce头歌答案
518 0
|
分布式数据库 微服务 Hbase
Hbase训练营第六课课后练习1
Hbase训练营第六课课后练习1
71 0