ReentrantLock,Condition

简介:       public class ReentrantLockAndConditionTest { public static void main(String[] args) { ReentrantLockQueue queue =new Reen...

 

 

 

public class ReentrantLockAndConditionTest {

    public static void main(String[] args) {
    	ReentrantLockQueue queue =new  ReentrantLockQueue();
    	for (int i = 0; i < 100; i++) {
			queue.put("a");
			String string = queue.getString();
			System.out.println(string); 
		}
    	
	}


    public abstract class MessageQueue<T>{
    	private Queue<T> queue;
    	private List<FailedMessageWrap> resendList;
    	protected int resendSleepInterval = 1000 * 60 ;
    	protected int maxFailedCount = 10;
    	private Lock sendLock = new ReentrantLock();
    	private Condition sendCondition = sendLock.newCondition();
    	private Lock resendLock = new ReentrantLock();
    	private volatile boolean stopRequired ;
    	 
    	public MessageQueue(){ 
    		queue = new LinkedList<T>(); 
    		resendList = new LinkedList<FailedMessageWrap>(); 
    		stopRequired = false;
    		 
    		ExecutorService sendService = Executors.newFixedThreadPool(1);
    		for (int i = 0; i < 1; i++) {
    			sendService.execute(new SendTask());
    		} 
    		Executors.newSingleThreadExecutor().execute(new ResendTask());
    	}
    	 
    	public void send(T message){
    		if(message == null){
    			return;
    		}
    		
    		try {
    			sendLock.lock(); 
    			queue.add(message); 
    			sendCondition.signalAll();
    		}finally{
    			sendLock.unlock();
    		}
    		
    	}
    	 
    	public void stop(){
    		stopRequired = true;
    	}
    	 
    	protected abstract boolean doSend(T message);
    	 
    	class FailedMessageWrap{
    		private T message;
    		private int failedCount;
    		
    		FailedMessageWrap(T message){
    			this.message = message;
    			failedCount = 0;
    		}

    		public int getFailedCount() {
    			return failedCount;
    		}
     
    		public void increaseFailedCount() {
    			this.failedCount += 1;
    		}

    		public T getMessage() {
    			return message;
    		}
    		
    	}
     
    	class SendTask implements Runnable{
    		@Override
    		public void run() {
    			while(!stopRequired){
    				T message;
    				
    				try {
    					sendLock.lock(); 
    					message = queue.poll();
    					if(message == null){
    						try {
    							sendCondition.await();
    						} catch (Exception e) {
    							e.printStackTrace();
    						}
    						continue;
    					}
    				}finally{
    					sendLock.unlock();
    				}
    					
    				if(!doSend(message)){
    					try {
    						resendLock.lock(); 
    						resendList.add(new FailedMessageWrap(message));
    					} finally{
    						resendLock.unlock();
    					}
    				}
    			
    			}
    			
    		}
    	}
    	 
    	class ResendTask implements Runnable{
    		@Override
    		public void run() { 
    			while(!stopRequired){
    				
    				try {
    					Thread.sleep(resendSleepInterval);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				} 
    				List<FailedMessageWrap> removeList = new ArrayList<FailedMessageWrap>();
    				
    				try {
    					resendLock.lock();
    					 
    					for(FailedMessageWrap messageWrap : resendList){
    						if(messageWrap.getFailedCount() > maxFailedCount){
    							removeList.add(messageWrap);
    							continue;
    						}
    						
    						T message =  messageWrap.getMessage(); 
    						if(!doSend(message)){
    							messageWrap.increaseFailedCount();
    						}else{
    							removeList.add(messageWrap);
    						}
    					}
    				 
    					for (FailedMessageWrap messageWrap : removeList) {
    						resendList.remove(messageWrap);
    					}
    				} finally{
    					resendLock.unlock();
    				}
    			
    			}
    		}
    	}
    	

    }
    
    
	
    public static class ReentrantLockQueue{
    	private ReentrantLock  lock = new ReentrantLock();
    	private Queue<String> queue = new LinkedList<String>(); 
    	public  void put(String s){
    		try{
	    		lock.lock();
	    		queue.add(s);
    		}catch(Exception e){
    			
    		}finally{
    			lock.unlock();
    		}
    	}
    	
    	public String getString(){
    		try{
    			lock.lock();
        		String poll = queue.poll();
        		return poll;
    		}catch(Exception e){
    			
    		}finally{
    			lock.unlock();
    		}
			return null;
    	}
    	
    }
	
    
    
	
	

}

 

 

 

 

 

 

 

 

 

 

 

 

 

捐助开发者 

在兴趣的驱动下,写一个免费的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(支持支付宝和微信 以及扣扣群),没钱捧个人场,谢谢各位。

 

个人主页http://knight-black-bob.iteye.com/



 
 
 谢谢您的赞助,我会做的更好!

目录
相关文章
|
9月前
|
安全 Java
一天一个 JUC 工具类 Lock 和 Condition
当谈到Java多线程编程时,我们不可避免地需要处理并发问题。为此Java提供了一个强大的工具包——java.util.concurrent(JUC)
线程同步的方法:Synchronized、Lock、ReentrantLock分析
线程同步的方法:Synchronized、Lock、ReentrantLock分析
|
Java
Callable,Lock,Condition,ReadWriteLock
Callable,Lock,Condition,ReadWriteLock
44 0
|
API
图解ReentrantLock的条件变量Condition机制
图解ReentrantLock的条件变量Condition机制
85 0
图解ReentrantLock的条件变量Condition机制
|
安全 Java
Java并发编程 - AQS 之 Condition
Java并发编程 - AQS 之 Condition
92 0
|
安全 Java
Java并发:Condition详解
Java并发:Condition详解
267 0
Java并发:Condition详解
|
存储 设计模式 Java
深入理解ReentrantLock
同步锁synchronized和重入锁ReentrantLock都是用于并发程序设计必不可少的手段,在JDK 5.0早期版本中,同步锁性能远远低于重入锁,但是在6.0版本之后,jdk对同步锁做了大量的优化,使得同步锁跟重入锁性能差距并不大,并且jdk团队表示,同步锁还有进一步升级优化的空间
深入理解ReentrantLock