任务
package com.yeepay.sxf.compensation; import java.sql.Date; /** * 任务 * @author sxf * */ public class OrderResponse { /** * 订单id */ private String orderId; /** * 上次查询时间 */ private Date lastTime; /** * 下次查询时间 */ private Date nextTime; /** * 相隔时常 */ private Long timeLong; /** * 级别 */ private Integer leveInteger; public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public Date getLastTime() { return lastTime; } public void setLastTime(Date lastTime) { this.lastTime = lastTime; } public Date getNextTime() { return nextTime; } public void setNextTime(Date nextTime) { this.nextTime = nextTime; } public Long getTimeLong() { return timeLong; } public void setTimeLong(Long timeLong) { this.timeLong = timeLong; } public Integer getLeveInteger() { return leveInteger; } public void setLeveInteger(Integer leveInteger) { this.leveInteger = leveInteger; } }
类似优先级的队列结构
package com.yeepay.sxf.compensation; import java.sql.Date; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.Random; /** * 该数据结构类似优先级队列。(生产线程往该数据结构存放任务,消费线程从该数据结构取任务执行) * (1)生产线程根据等级往不同的容器中存放任务。 * (2)消费线程根据不同等级的权重,每次取用时获取一个根据权重计算出来的等级,从不同等级的容器中获取任务。 * (3)任务是有执行时间的。如果当前时间不到该任务的执行时间,则让获取该任务的当前线程wait相应的时间。 * @author sxf * */ public class CompensationQueue { /** * 一级锁 */ private final Object firstLock=new Object(); /** * 二级锁 */ private final Object secondLock=new Object(); /** * 三级锁 */ private final Object thirdLock=new Object(); /** * 一级订单 */ private final LinkedList<OrderResponse> firstList=new LinkedList<OrderResponse>(); /** * 二级订单 */ private final LinkedList<OrderResponse> secondList=new LinkedList<OrderResponse>(); /** * 三级订单 */ private final LinkedList<OrderResponse> thirdList=new LinkedList<OrderResponse>(); /** * 一级读取次数 */ private Integer readFirstInteger=0; /** * 二级读取次数 */ private Integer readSencodInteger=0; /** * 三级读取次数 */ private Integer readThirdInteger=0; /** * 时间间隔 */ private Integer timeLongInteger=0; /** * 尝试读取几次 */ private Integer reads; /** * 一级别权重 */ private Integer firstWeight; /** * 二级权重 */ private Integer sencodWeight; /** * 三级权重 */ private Integer thirdWeight; /** * 一级读取最大数降级 */ private final Integer readFirstMax; /** * 二级最大数 */ private final Integer readSecondMax; public CompensationQueue(Integer timeLongInteger,Integer reads,Integer firstWeight, Integer sencodWeight,Integer thirdWeight,Integer readFirstMax,final Integer readSecondMax) { super(); this.timeLongInteger = timeLongInteger; this.reads=reads; this.firstWeight=firstWeight; this.sencodWeight=sencodWeight; this.thirdWeight=thirdWeight; this.readFirstMax=readFirstMax; this.readSecondMax=readSecondMax; } /** * 存放一笔订单 * @param orderResponse * @return */ public boolean putOrderResponse(OrderResponse orderResponse){ //获取数据等级 Integer ad=orderResponse.getLeveInteger(); //根据等级放入相应的等级容器中 if(ad==1){ try { synchronized (firstLock) { firstList.addFirst(orderResponse); } }finally{ firstLock.notifyAll(); } }else if(ad==2){ try { synchronized (secondList) { secondList.addFirst(orderResponse); } }finally{ secondLock.notifyAll(); } }else if(ad==3){ try { synchronized (thirdLock) { thirdList.addFirst(orderResponse); } }finally{ thirdLock.notifyAll(); } } return false; } /** * 获取一笔订单 * @param orderResponse * @return */ public OrderResponse getOrderResponse(){ //获取权重 Integer leve=getWeightLeve(); int count=0; //一级降二级标识 boolean flagA=false; //二级降三级标识 boolean flagB=false; //三级升一级标识 boolean flagC=false; for(int a=0;a<reads;a++){ if(leve==1||(leve==3&&flagC)){ flagC=false; synchronized (firstLock) { if(firstList.size()>0){ if(readFirstInteger<readFirstMax){ //如果还每到执行时间,就让当前线程休眠多长时间 OrderResponse orderResponse=firstList.getLast(); Date nextDate=orderResponse.getNextTime(); Long aLong=0L; while((aLong=System.currentTimeMillis()-nextDate.getTime())<0){ try { firstLock.wait(aLong); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } readFirstInteger++; return firstList.removeLast(); }else{ readFirstInteger=0; flagA=true; } }else{ flagA=true; } } } if(leve==2||(leve==1&&flagA)||(leve==3&&flagA)){ flagA=false; synchronized (secondLock) { if(secondList.size()>0){ if(readSencodInteger<readSecondMax){ OrderResponse orderResponse=secondList.getLast(); Date nextDate=orderResponse.getNextTime(); Long aLong=0L; while((aLong=System.currentTimeMillis()-nextDate.getTime())<0){ try { secondLock.wait(aLong); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } readSencodInteger++; return secondList.removeLast(); }else{ readSencodInteger=0; flagB=true; } }else{ flagB=true; } } } if(leve==3||(leve==2&&flagB)||(leve==1&&flagB)){ flagB=false; synchronized (thirdLock) { if(thirdList.size()>0){ OrderResponse orderResponse=thirdList.getLast(); Date nextDate=orderResponse.getNextTime(); Long aLong=0L; while((aLong=System.currentTimeMillis()-nextDate.getTime())<0){ try { thirdLock.wait(aLong); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return thirdList.removeLast(); }else{ flagC=true; } } } } return null; } private Integer getWeightLeve(){ Map<Integer, Integer> map=new HashMap<Integer, Integer>(); map.put(firstWeight,1); Integer sencodSum=firstWeight+sencodWeight; map.put(sencodSum, 2); Integer sum=sencodSum+thirdWeight; map.put(thirdWeight, 3); for(Integer sumd:map.keySet()){ Random r = new Random(System.currentTimeMillis()); int radom = Math.abs(r.nextInt())%sum; if(sumd>radom){ return map.get(sumd); } } return 1; } public Integer getReadFirstInteger() { return readFirstInteger; } public void setReadFirstInteger(Integer readFirstInteger) { this.readFirstInteger = readFirstInteger; } public Integer getReadSencodInteger() { return readSencodInteger; } public void setReadSencodInteger(Integer readSencodInteger) { this.readSencodInteger = readSencodInteger; } public Integer getReadThirdInteger() { return readThirdInteger; } public void setReadThirdInteger(Integer readThirdInteger) { this.readThirdInteger = readThirdInteger; } }