读取hdfs中文件并做处理,取出卡号,通过卡号连接hbase查询出对应客户号,写入redis,因为不用输出,所以不调用context.write方法,整个操作在一个map中便可完成
protected HTable connect //setup方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高! protected void setup(Context context) throws IOExcption,InterruptedException{ super.setup(context) String jobName = context.getJobName(); //文件索引值 cartNoIndex = conf.get(jobName + "source.key","7"); //创建hbase连接,hbase-site.xml配置文件需要在jar包中 Configuration config = HBaseConfiguration.create(); connect = new HTable(config,"tableName") } protected void map(writable key,Text value,Context context){ if(value == null || value.toString().trim().isEmpty()){ //计数器,记录处理的条数 context.getCounter(....).increment(1); }else{ String[] values = Utils.split(value,separator,true); //业务逻辑处理 int i = Integer.parseInt(cartNoIndex); if(i<values.length){ cardNo = values[i]; }else{ logger.error("cardNo cannot find"); } //从hbase中查询出对应客户号 String rowkey = HTableManager.generatRowkey(cardNo); Get getResult = new Get(rowkey.getBytes()); Result rs = connect.get(getResult); String curNo = Bytes.toString(rs.getValue("f1".getBytes(),"column_name".getBtes());
RedisClient.getRedisClient().zincrbyset("spending:rank",countNum,custNo);
protected void cleanup(context context)throws IOException,InterruptedException{
super.cleanup(context);
connect.close();
}
public static String[] split(String value,String separator,boolean trimSpace){ String[] rtn = split(value.separator); if(trimSpace && rtn != null){ for(int i=0;i<rtn.length;i++){ rtn[i] = rtn[i].trim(); } } return rtn; } public static String[] split(String value,String separator){ String[] rtn = null; if(value != null){ boolean endBlank = false; if(value.endsWith(separator)){ value +=" "; endBlank = true; } separator = escapeExprSpecialWord(deparator); if(endBlank){ rtn(rtn.length-1) = ""; } } return rtn; } public static String escapeExprSpecialWord(String keyWord){ if(keyword != null && !keyword.isEmpty()){ String[] fbsArr = {"\\","|","(",")"}; for(String key : fbsArr){ if(keyword.contains(key){ keyword = keyword.replace(key,"\\"+key); } } } return keyword; }