hbase 报文 处理 逻辑

简介:     报文格式:  每隔一个小时 出现一个文件类型 报文 ,所以 我们的处理思路是,一个小时做一次处理。   import java.io.FileInputStream; import java.

 

 

报文格式:

 

每隔一个小时 出现一个文件类型 报文 ,所以 我们的处理思路是,一个小时做一次处理。

 

import java.io.FileInputStream;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
 

import com.cmcc.aoi.util.OsUtil;

public class HbaseStarter {

	public static void main(String[] args) throws  Exception {
		Properties properties=new Properties();
		String config="";
		if(!OsUtil.isLinux())
			config= "D:/work/util/aoi-hbase/trunk/src/main/resources/config.properties";
		else	
			config = "/home/aoi/aoi-hbase/conf/config.properties"; 
		FileInputStream fis = new FileInputStream(config);
		properties.load(fis);
		fis.close(); 
		
		String hbaseTable = properties.getProperty("com.cmcc.aoi.ua.hbaseTable");
		String hbaseFamily = properties.getProperty("com.cmcc.aoi.ua.hbaseFamily"); 
		String sourceFilePath=properties.getProperty("com.cmcc.aoi.ua.sourceFilePath");
		String archivelogsPath=properties.getProperty("com.cmcc.aoi.ua.archivelogsPath");
		boolean isDebug= Integer.parseInt( properties.getProperty("com.cmcc.aoi.ua.isDebug")) == 0 ? false : true;
		String sourceFileName = properties.getProperty("com.cmcc.aoi.ua.sourceFileName"); 
		 
		String[] hbaseTableName=hbaseTable.split(",");  // table
		String[] hbaseFamilyName=hbaseFamily.split("&");// family  
		String[] sourceFileNameArr=sourceFileName.split(","); 
		 
		
		ScheduledExecutorService service = Executors.newScheduledThreadPool(2);  
		service.scheduleAtFixedRate(new DeviceReadThread (sourceFileNameArr[0],hbaseTableName[0],hbaseFamilyName[0].split(","),sourceFilePath,archivelogsPath,isDebug) 
			               ,0, 1,TimeUnit.HOURS); 
		service.scheduleAtFixedRate(new AppReadThread (sourceFileNameArr[1],hbaseTableName[1],hbaseFamilyName[1].split(","),sourceFilePath,archivelogsPath,isDebug) 
                         ,0, 1,TimeUnit.HOURS); 
	}
}

 

 

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader; 
import java.io.IOException;
import java.util.Date;  
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; 
import Model.Device; 
import com.alibaba.fastjson.JSON; 

public class DeviceReadThread extends BaseRunnabler {

	static Logger logger = LoggerFactory.getLogger(DeviceReadThread.class);
	
	public DeviceReadThread(String sourceFileName, String hbaseTable,
			String[] hbaseFamily, String sourceFilePath,
			String archivelogsPath, boolean isDebug) {
		super(sourceFileName, hbaseTable, hbaseFamily, sourceFilePath, archivelogsPath,
				isDebug); 
	}
	 
	public void processFile(IOperator hu) {
		FileReader logReader = null;
		BufferedReader logBufferedReader = null;
		try { 
			File logFile = new File(sourceFilePath+sourceFileName);
			logReader = new FileReader(logFile);
			logBufferedReader = new BufferedReader(logReader);
			String temp = logBufferedReader.readLine();
			//logger.error(" temp is  " + temp );
			while ( temp  != null) {
				Device device = JSON.parseObject(temp, Device.class); 
				//logger.error(" device is null ? " + ( device == null ) );
				
				String[][] s = new String[][] {
						{ device.getLid(), hbaseFamily[0], "lid" , device.getLid() } ,
						{ device.getLid(), hbaseFamily[1], "date", (new Date()).toString() }, 
						{ device.getLid(), hbaseFamily[2], "os", device.getOs() },
						{ device.getLid(), hbaseFamily[2], "osv", device.getOsv()} };
				hu.writeMultRow(hbaseTable, s);
				logger.info(" hbase util end "   );
				temp = logBufferedReader.readLine();
			}
		} catch (Exception e) {
			logger.error(" DeviceReadThread error "   );
			e.printStackTrace();
		} finally { 
			try {
				logBufferedReader.close();
			} catch (IOException e) { 
				e.printStackTrace();
			}
			try {
				logReader.close();
			} catch (IOException e) { 
				e.printStackTrace();
			}
		}
	} 

}

 

import java.io.File; 
import java.util.Arrays; 

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import com.cmcc.aoi.util.FileUtil;

public abstract class BaseRunnabler implements Runnable{
	protected static Logger logger = LoggerFactory.getLogger(BaseRunnabler.class);
	
	String sourceFileName=""; // 读取文件路径  
	String hbaseTable="";  // hbase  表名
	String [] hbaseFamily=null;    // 行列簇名 
	String sourceFilePath ;
	String archivelogsPath ;
	boolean isDebug; 
		
	
	public BaseRunnabler(String sourceFileName,String hbaseTable,String [] hbaseFamily  ,String sourceFilePath,String archivelogsPath,boolean isDebug ){
		this.hbaseTable=hbaseTable;
		this.hbaseFamily = hbaseFamily; 
		this.sourceFileName=sourceFileName;  
		this.sourceFilePath = sourceFilePath;
		this.archivelogsPath = archivelogsPath;
		this.isDebug = isDebug; 
	}	
	
	@Override
	public void run() {
		try{
		IOperator hu = new HbaseUtil( hbaseTable,hbaseFamily);
    	hu.createTable(hbaseTable,hbaseFamily );  
    	
    	File file=new File(sourceFilePath);
		 File[] tempFileList = file.listFiles();
		 Arrays.sort(tempFileList);
		 for (File tempFile: tempFileList) {
			 if (tempFile.isFile() && tempFile.getName().contains(sourceFileName +".") ) {
				try{
					try{
						processFile(hu);
					}catch (Exception e) { 
						logger.error("readfile error ,must continue to protect  to read  other file ");
						continue;
					}
				
				removeFile(tempFile);
				
				}catch (Exception e2) { 
					logger.error(" one file has an error ,other  file must continue to do this task ");
				}
			}
		 }
    	
    	 
		}catch (Exception e) {
			e.printStackTrace();
		}

	}
	
	public abstract void processFile(IOperator hu) throws Exception; 	
	
	private void removeFile(File file) {
		if (isDebug) {
			 File path = new File(archivelogsPath);
			 if (!path.exists()) {
				 path.mkdirs();
			 }
			 FileUtil.moveFile(file, new File(archivelogsPath,file.getName()));
			 logger.info("remove file :" + file.getName());
		 }else{
			 file.delete();
			 logger.info("delete file :" + file.getName());
		 }
	}
}



 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

捐助开发者

在兴趣的驱动下,写一个免费的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。



 
 
 谢谢您的赞助,我会做的更好!

 

 

相关实践学习
云数据库HBase版使用教程
  相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情: https://cn.aliyun.com/product/hbase   ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
4月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
84 0
|
8月前
|
SQL 分布式计算 Hadoop
Hadoop集群hbase的安装
Hadoop集群hbase的安装
143 0
|
2天前
|
存储 分布式计算 Hadoop
基于Hadoop分布式数据库HBase1.0部署及使用
基于Hadoop分布式数据库HBase1.0部署及使用
|
4月前
|
分布式计算 Hadoop 关系型数据库
Hadoop任务scan Hbase 导出数据量变小分析
Hadoop任务scan Hbase 导出数据量变小分析
53 0
|
3月前
|
存储 分布式计算 Hadoop
Hadoop中的HBase是什么?请解释其作用和用途。
Hadoop中的HBase是什么?请解释其作用和用途。
40 0
|
4月前
|
SQL 分布式计算 Hadoop
Hadoop学习笔记(HDP)-Part.16 安装HBase
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
82 1
Hadoop学习笔记(HDP)-Part.16 安装HBase
|
8月前
|
分布式计算 Hadoop 分布式数据库
开机时监听Hadoop和Zookpeer启动之后再启动Hbase
开机时监听Hadoop和Zookpeer启动之后再启动Hbase
|
8月前
|
存储 分布式计算 Hadoop
Hadoop之Hbase安装和配置
Hadoop之Hbase安装和配置
748 0
|
SQL 分布式计算 安全
hadoop+hbase+zookeeper+hive
hadoop+hbase+zookeeper+hive
196 0
hadoop+hbase+zookeeper+hive
|
分布式计算 安全 Hadoop
hadoop+hbase+zookeeper安装指南
hadoop+hbase+zookeeper安装指南
217 0
hadoop+hbase+zookeeper安装指南