这个是最开始的数据:乱七八糟的,要取出其中的一些,类似这些
其中毫秒级的时间数据要转为时间戳
spark先过滤出要取的数据
package sparkj; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class sp { public static void main(String[] args) { // TODO Auto-generated method stub SparkConf sparkConf = new SparkConf().setAppName("PeopleInfoCalculator").setMaster("local[2]"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD<String> filedata=sc.textFile("file:///root/555.txt"); JavaRDD<String> data2=filedata.filter(f->f.matches("[0-9].*")) .map(f->String2Date(f.split(" ")[0],f.split(" ")[1])+" "+f.split(" ")[2]+" "+f.split(" ")[3].split(":")[0]); data2.foreach(f->System.err.println(f)); //data2.saveAsTextFile("file:///root/555copy.txt"); // JavaPairRDD<String,Integer> rdd6=filedata.mapToPair(f->new Tuple2<>(f.split(",")[1],1)); // JavaPairRDD<String,Integer> rdd7=rdd6.reduceByKey((x,y)->x+y); //rdd7.foreach(f->System.err.println(f)); } public static long String2Date(String date,String time) throws Exception{ String newdate = date + " " + time; SimpleDateFormat s = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS"); return s.parse(newdate).getTime(); } }
结果如下:
打开hbase,对hbase进行操作了,先start...,
root@master:/opt/hbase-1.3.3/bin# ./start-hbase.sh starting master, logging to /opt/hbase-1.3.3/bin/../logs/hbase-root-master-master.out Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0 Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0 root@master:/opt/hbase-1.3.3/bin# ./hbase shell 2022-05-05 07:21:22,172 WARN [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable HBase Shell; enter 'help<RETURN>' for list of supported commands. Type "exit<RETURN>" to leave the HBase Shell Version 1.3.3, ra75a458e0c8f3c60db57f30ec4b06a606c9027b4, Fri Dec 14 16:02:53 PST 2018 hbase(main):001:0> list TABLE
hbase单词计数
单独的hbase写法
主函数:
package test1; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Scanner; public class MainHB1 { private static String getType(Object a) { return a.getClass().toString(); } public static void main(String[] args) throws Exception { // try { // HBaseOp ho = new HBaseOp(); // String tab = "logData1"; // String[] name = new String[1]; // name[0] = "log"; // ho.createTable(tab, name); // System.out.println("ok"); // } catch (Exception e) { // e.printStackTrace(); // } try { HBaseOp ho = new HBaseOp(); String tab = "logData1"; String[] name = new String[1]; name[0] = "log"; //ho.filterByRowFilter(tab, "class","info"); //ho.filterBySingleColumnValueFilter(tab, "log","level",""); System.out.println("ERROR一共是"); ho.filterBySingleColumnValueFilter(tab, "log","level","ERROR"); System.out.println("FATAL一共是"); ho.filterBySingleColumnValueFilter(tab, "log","level","FATAL"); System.out.println("ok"); } catch (Exception e) { e.printStackTrace(); } File file = new File("/root/555copy.txt"); System.out.println("插入数据成功"); System.out.println(" \\n" + "\\\n" + ""); BufferedReader br = new BufferedReader(new FileReader(file)); String s; String[] column = new String[2]; column[0] = "level"; column[1] = "class"; int i = 0; while ((s = br.readLine()) != null) { String line[] = s.split(" "); long t = Long.parseLong(line[0]); String time = String.valueOf(Long.MAX_VALUE-t); try { HBaseOp ho = new HBaseOp(); String tab = "logData1"; String name = "log"; // ho.put(tab,time, name,column[0], line[1]); // ho.put(tab,time, name,column[1], line[2]); } catch (Exception e) { e.printStackTrace(); } } System.out.println(i); System.out.println("ok"); } }
类的方法
package test1; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; //import org.apache.hadoop.hbase.filter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.util.Bytes; public class HBaseOp { class P{ String row; String[] colum; } // TODO Auto-generated method stub Configuration conf=HBaseConfiguration.create(); public void createTable(String strTab,String[] arrCf) throws Exception{ HBaseAdmin admin=new HBaseAdmin(conf); if(admin.tableExists(strTab)) { System.out.println("Table"+strTab+"exists."); } else { HTableDescriptor fs=new HTableDescriptor(strTab); for(String cf:arrCf) { HColumnDescriptor ff=new HColumnDescriptor(cf); ff.setTimeToLive(10000); fs.addFamily(ff); } admin.createTable(fs); } admin.close(); } public void deleteTable(String strTab)throws Exception { HBaseAdmin admin = new HBaseAdmin(conf); if (!admin.tableExists(strTab)) {//判断表是否存在 System.out.println(strTab + "不存在"); } else if(admin.isTableEnabled(strTab)) {//如果表处于disable admin.disableTable(strTab); admin.deleteTable(strTab); System.out.println(strTab + " deleted"); } else { admin.deleteTable(strTab); System.out.println(strTab + " deleted"); } admin.close(); } // public void mulPut() throws Exception { HTable table = new HTable(conf,"scores"); // 创建一个列表用于存放Put实例 List<Put> puts = new ArrayList<Put>(); // 将第一个Put实例添加到列表 Put put1 = new Put(Bytes.toBytes("Tom")); put1.add(Bytes.toBytes("grade"), Bytes.toBytes(""), Bytes.toBytes("1")); put1.add(Bytes.toBytes("course"), Bytes.toBytes("math"), Bytes.toBytes("990")); puts.add(put1); // 将第2个Put实例添加到列表 Put put2 = new Put(Bytes.toBytes("John")); put2.add(Bytes.toBytes("grade"), Bytes.toBytes(""), Bytes.toBytes("2")); put2.add(Bytes.toBytes("course"), Bytes.toBytes("Chinese"), Bytes.toBytes("99")); puts.add(put2); // 将第3个Put实例添加到列表 Put put3 = new Put(Bytes.toBytes("gyy")); put3.add(Bytes.toBytes("grade"), Bytes.toBytes(""), Bytes.toBytes("12")); put3.add(Bytes.toBytes("course"), Bytes.toBytes("math"), Bytes.toBytes("1000")); put3.add(Bytes.toBytes("course"), Bytes.toBytes("geo"), Bytes.toBytes("1000")); put3.add(Bytes.toBytes("course"), Bytes.toBytes("语文"), Bytes.toBytes("1000")); // 向HBase中存入多行多列数据 puts.add(put3); table.put(puts); table.close(); } // public void put(String tablename,String row,String cf,String column,String data) throws Exception{ HTable table=new HTable(conf,tablename); Put p=new Put(Bytes.toBytes(row)); p.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(data)); table.put(p); table.close(); } public void filterBySingleColumnValueFilter(String tablename, String strF, String strC, String strClass) throws Exception { HTable table = new HTable(conf, tablename); Scan s = new Scan(); SingleColumnValueFilter sf = new SingleColumnValueFilter(Bytes.toBytes(strF), Bytes.toBytes(strC), CompareOp.EQUAL, Bytes.toBytes(strClass)); s.setFilter(sf); ResultScanner rs = table.getScanner(s); HashMap<String, Integer> hashMap = new HashMap<>(); int i=0; for (Result r : rs) { i=i+1; byte[] row = r.getRow(); byte[] value = r.getValue(Bytes.toBytes(strF), Bytes.toBytes(strC)); byte[] value2 = r.getValue(Bytes.toBytes(strF), Bytes.toBytes("class")); if(!hashMap.containsKey(Bytes.toString(value2))) { //集合中没有该单词,值定义为1 hashMap.put(Bytes.toString(value2), 1); }else if(hashMap.containsKey(Bytes.toString(value2))) { //集合中有该单词,值+1;key不变; int b=hashMap.get(Bytes.toString(value2)); hashMap.put(Bytes.toString(value2), b+1); } // System.out.println("Filter: " + Bytes.toString(row) + " is in " + strC + " " + Bytes.toString(value)+", class " + Bytes.toString(value2)); } System.out.println(hashMap); // System.out.println( " ERROR一共是="+i); System.out.println("出现最多的类是:"+getProcessCdByName(hashMap)); System.out.println(" FATAL一共是="+i); rs.close(); table.close(); } public static String getProcessCdByName(HashMap<String, Integer> processMap){//我找最大值对应的哪一个键 int max=0; for (Integer in : processMap.values()) { System.err.println(in); max=Math.max(max, in); } String result = null; Set<Map.Entry<String, Integer>> set = processMap.entrySet(); for(Map.Entry<String, Integer> entry : set){ if(entry.getValue()==max){ result = entry.getKey(); break; } } return result; } public void filterBySingleColumnValueFilter(String tablename, String cf, String C) throws Exception { HTable table = new HTable(conf, tablename); Scan s = new Scan(); SingleColumnValueFilter sf = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(C), CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("ERROR"))); SingleColumnValueFilter sf2 = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(C), CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("FATAL"))); FilterList lst= new FilterList( FilterList.Operator.MUST_PASS_ONE); lst.addFilter(sf); lst.addFilter(sf2); s.setFilter(lst); ResultScanner rs = table.getScanner(s); int i =0; for (Result r : rs) { byte[] row = r.getRow(); byte[] value = r.getValue(Bytes.toBytes(cf), Bytes.toBytes(C)); byte[] value2 = r.getValue(Bytes.toBytes(cf), Bytes.toBytes("class")); System.out.println("--------------------------"); System.out.println("Filter: " + Bytes.toString(row) + " is in " + C + " " + Bytes.toString(value)+", class " + Bytes.toString(value2)); i=i+1; } System.out.println(i); rs.close(); table.close(); } //public void filterBySingleColumnValueFilter(String tablename, String strF, String strC, String strClass) throws Exception //{ // HTable table = new HTable(conf, tablename); // Scan s = new Scan(); // SingleColumnValueFilter sf = // new SingleColumnValueFilter(Bytes.toBytes(strF), // Bytes.toBytes(strC), CompareOp.EQUAL, // Bytes.toBytes(strClass)); // s.setFilter(sf); // // ResultScanner rs = table.getScanner(s); // // for (Result r : rs) { // byte[] row = r.getRow(); // byte[] value = r.getValue(Bytes.toBytes(strF), Bytes.toBytes(strC)); // byte[] value2 = r.getValue(Bytes.toBytes(strF), Bytes.toBytes("class")); // System.out.println("Filter: " + Bytes.toString(row) + " is in " + strC + " " + Bytes.toString(value)+", class " + Bytes.toString(value2)); // } // rs.close(); // table.close(); //} public void filterBySingleColumnValueFilter(String tablename, String strF, String strC, String strClass) throws Exception { HTable table = new HTable(conf, tablename); Scan s = new Scan(); SingleColumnValueFilter sf = new SingleColumnValueFilter(Bytes.toBytes(strF), Bytes.toBytes(strC), CompareOp.EQUAL, Bytes.toBytes(strClass)); s.setFilter(sf); ResultScanner rs = table.getScanner(s); HashMap<String, Integer> hashMap = new HashMap<>(); int i=0; for (Result r : rs) { i=i+1; byte[] row = r.getRow(); byte[] value = r.getValue(Bytes.toBytes(strF), Bytes.toBytes(strC)); byte[] value2 = r.getValue(Bytes.toBytes(strF), Bytes.toBytes("class")); if(!hashMap.containsKey(Bytes.toString(value2))) { //集合中没有该单词,值定义为1 hashMap.put(Bytes.toString(value2), 1); }else if(hashMap.containsKey(Bytes.toString(value2))) { //集合中有该单词,值+1;key不变; int b=hashMap.get(Bytes.toString(value2)); hashMap.put(Bytes.toString(value2), b+1); } // System.out.println("Filter: " + Bytes.toString(row) + " is in " + strC + " " + Bytes.toString(value)+", class " + Bytes.toString(value2)); } System.out.println(hashMap); System.out.println("i="+i); rs.close(); table.close(); } public void get(String tablename,String row,String info,String name) throws Exception{ HTable table=new HTable(conf,tablename); Get g=new Get(Bytes.toBytes(row)); Result result = table.get(g); byte[] val = result.getValue(Bytes.toBytes(info),Bytes.toBytes(name)); System.out.println(info+" "+name+" "+"Values =" + Bytes.toString(val)); } public void scan(String tablename,String cf,String column) throws Exception{ HTable table=new HTable(conf,tablename); Scan s=new Scan(); s.setStartRow(Bytes.toBytes("0")); s.setStopRow(Bytes.toBytes("g")); // s.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")); ResultScanner res=table.getScanner(s); for(Result r:res) { byte[] row=r.getRow(); byte[] val=r.getValue(Bytes.toBytes(cf), Bytes.toBytes(column)); System.out.println("Scan:"+Bytes.toString(row) +" values is "+Bytes.toString(val)); } res.close(); table.close(); } public void delete(String tablename,String row,String cf,String column,String data) throws Exception{ HTable table=new HTable(conf,tablename); List<Delete> ls=new ArrayList<Delete>(); Delete p=new Delete(Bytes.toBytes(row)); ls.add(p); //p.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(data)); table.delete(ls); table.close(); } }
插进表里面这种数据貌似是对的
或者直接用spark简单粗暴过滤这个,这个是这一句直接来的
先hbase主函数这个:
ho.filterBySingleColumnValueFilter(tab, "log","level","ERROR");
会有一个数据产生,把数据拷贝到一个文件里面,再spark单词计数哈哈哈
方法类这样: public void filterBySingleColumnValueFilter1(String tablename, String strF, String strC, String strClass) throws Exception { HTable table = new HTable(conf, tablename); Scan s = new Scan(); SingleColumnValueFilter sf = new SingleColumnValueFilter(Bytes.toBytes(strF), Bytes.toBytes(strC), CompareOp.EQUAL, Bytes.toBytes(strClass)); s.setFilter(sf); ResultScanner rs = table.getScanner(s); for (Result r : rs) { byte[] row = r.getRow(); byte[] value = r.getValue(Bytes.toBytes(strF), Bytes.toBytes(strC)); byte[] value2 = r.getValue(Bytes.toBytes(strF), Bytes.toBytes("class")); System.out.println("Filter: " + Bytes.toString(row) + " is in " + strC + " " + Bytes.toString(value)+", class " + Bytes.toString(value2)); } rs.close(); table.close(); }
package sparkj; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class sp { public static void main(String[] args) { // TODO Auto-generated method stub SparkConf sparkConf = new SparkConf().setAppName("PeopleInfoCalculator").setMaster("local[2]"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD<String> filedata=sc.textFile("file:///root/555.txt"); // JavaRDD<String> data2=filedata.filter(f->f.matches("[0-9].*")) // .map(f->String2Date(f.split(" ")[0],f.split(" ")[1])+" "+f.split(" ")[2]+" "+f.split(" ")[3].split(":")[0]); // // data2.foreach(f->System.err.println(f)); // data2.saveAsTextFile("file:///root/555copy.txt"); JavaPairRDD<String,Integer> rdd6=filedata.mapToPair(f->new Tuple2<>(f.split(",")[1],1)); JavaPairRDD<String,Integer> rdd7=rdd6.reduceByKey((x,y)->x+y); rdd7.foreach(f->System.err.println(f)); } public static long String2Date(String date,String time) throws Exception{ String newdate = date + " " + time; SimpleDateFormat s = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS"); return s.parse(newdate).getTime(); } }
最后结果