Azureus源码剖析(四)

简介:
AEThread只是一个简单的线程类,提供一个抽象方法runSupport 供我们重写,相必之下,AEThread2有几个改进的地方:1)加入了运行锁机制,运行线程和等待线程的结束这两个操作需要争抢运行锁,若锁未放开,则说明线程还在运行,必须等待锁的释放。2)设置了一个守护线程链表,若待创建的线程是守护线程,当链表为空时,新建一个守护线程加入表尾,否则从链表尾部取一个线程节点来使用,类似线程池。3)对于守护线程,若活跃时间超过时限,就会从守护线程链表中删除头部节点,即超时最长时间的节点。
 
复制代码
public abstract class AEThread2 
{
    public static final boolean TRACE_TIMES = false;//是否跟踪线程运行时间
    
    private static final int MIN_RETAINED    = 2;//守护线程最小个数
    private static final int MAX_RETAINED    = 16;//守护线程最大个数
    
    private static final int THREAD_TIMEOUT_CHECK_PERIOD    = 10*1000;//线程超时检查间隔期:10秒
    private static final int THREAD_TIMEOUT                    = 60*1000;//线程超时时限:60秒 
    
    private static final LinkedList    daemon_threads = new LinkedList();//守护线程链表
    
    private static final class JoinLock 
    {
        volatile boolean released = false;//初始状态锁未放开
    }
    
    private static long    last_timeout_check;//最近的超时检查时间
    
    private static long    total_starts;//启动的线程总数
    private static long    total_creates;//创建的线程总数
    
    
    private threadWrapper    wrapper;//线程包装器(实质的线程)
    
    private String name;//线程名称
    private boolean daemon;//是否守护线程
    private int priority = Thread.NORM_PRIORITY;//线程优先级
    private volatile JoinLock lock = new JoinLock();
    
    public AEThread2(String    _name,boolean _daemon )
    {
        name = _name;
        daemon = _daemon;
    }
    public void start()
    {
        JoinLock currentLock = lock;
        JoinLock newLock;
        
        synchronized (currentLock)
        {
            // create new lock in case this is a restart, all old .join()s will be locked on the old thread and thus released by the old thread
            if(currentLock.released)
                newLock = lock = new JoinLock();
            else
                newLock = currentLock;
        }
        
        if ( daemon )
        {//是守护线程
            synchronized( daemon_threads )
            {
                total_starts++;//启动的总线程数加1                
                if ( daemon_threads.isEmpty())
                {//若守护线程链表为空,则创建一个新线程
                    total_creates++;
                    wrapper = new threadWrapper( name, true );            
                }
                else
                {//从守护线程链表中移除尾部节点
                    wrapper = (threadWrapper)daemon_threads.removeLast();
                    wrapper.setName( name );
                }
            }
        }
        else
        {//不是守护线程
        
            wrapper = new threadWrapper( name, false );
        }
        
        if ( priority != wrapper.getPriority() )
        {//设置线程优先级
            wrapper.setPriority( priority );
        }
        
        wrapper.currentLock = newLock;//传递锁
        
        wrapper.start( this, name );//真正启动线程运行
    }
    
    public void setPriority(int _priority )
    {
        priority    = _priority;
        if ( wrapper != null )
        {
            wrapper.setPriority( priority );
        }
    }
    
    public void setName(String    s )
    {
        name    = s;
        if ( wrapper != null )
        {
            wrapper.setName( name );
        }
    }
    
    public String getName()
    {
        return( name );
    }
    
    public void interrupt()
    {
        if ( wrapper == null )
        {
            throw new IllegalStateException( "Interrupted before started!" );
            
        }
        else
        {
            wrapper.interrupt();//中断线程
        }
    }
    
    public boolean isCurrentThread()
    {
        return( wrapper == Thread.currentThread());
    }
    
    public String toString()
    {
        if ( wrapper == null )
        {
            return( name + " [daemon=" + daemon + ",priority=" + priority + "]" );
            
        }
        else
        {
            return( wrapper.toString());
        }
    }
    
    public abstract void run();//实质的线程函数,在包装器类中调用
    
    public static boolean isOurThread(Thread thread )
    {
        return( AEThread.isOurThread( thread ));
    }
    
    public static void setOurThread()
    {
        AEThread.setOurThread();
    }
    
    public static void setOurThread(Thread    thread )
    {
        AEThread.setOurThread( thread );
    }
    
    //实质的线程,从Thread类继承
    protected static class threadWrapper extends Thread
    {
        private AESemaphore sem;
        private AEThread2    target; //被包装的目标
        private JoinLock    currentLock;//锁
        
        private long        last_active_time;//最近活跃时间
        
        protected threadWrapper(String name,boolean daemon )
        {
            super( name );
            setDaemon( daemon );//设置是否守护线程
        }
        
        public void run()
        {
            while( true )
            {
                synchronized( currentLock )
                {
                    try
                    {
                        if ( TRACE_TIMES )
                        {
                            long     start_time     = SystemTime.getHighPrecisionCounter();
                            long    start_cpu     = Java15Utils.getThreadCPUTime();

                            try
                            {
                                target.run();//实质的线程函数

                            }
                            finally
                            {
                                long    time_diff     = ( SystemTime.getHighPrecisionCounter() - start_time )/1000000;
                                long    cpu_diff    = ( Java15Utils.getThreadCPUTime() - start_cpu ) / 1000000;
                                
                                if ( cpu_diff > 10 || time_diff > 10 )
                                {
                                    System.out.println( TimeFormatter.milliStamp() + ": Thread: " + target.getName() + ": " + cpu_diff + "/" + time_diff );
                                }
                            }
                        }
                        else
                        {
                            target.run();
                        }
                                                
                    }
                    catch( Throwable e )
                    {
                        DebugLight.printStackTrace(e);
                        
                    }
                    finally
                    {//线程运行结束
                        target = null;
                        currentLock.released = true;//释放锁
                        currentLock.notifyAll();//通知其他阻塞线程                        
                    }
                }
                                
                if ( isInterrupted() || !Thread.currentThread().isDaemon())
                {//中断或不是守护线程
                    break;
                    
                }
                else
                {//是守护线程                    
                    synchronized( daemon_threads )
                    {                    
                        last_active_time    = SystemTime.getCurrentTime();                        
                        if (last_active_time < last_timeout_check ||last_active_time - last_timeout_check > THREAD_TIMEOUT_CHECK_PERIOD )
                        {
                            last_timeout_check    = last_active_time;
                            
                            while( daemon_threads.size() > 0 && daemon_threads.size() > MIN_RETAINED )
                            {                                
                                threadWrapper thread = (threadWrapper)daemon_threads.getFirst();//取链表头部                                
                                long    thread_time = thread.last_active_time;                                
                                if (last_active_time < thread_time ||last_active_time - thread_time > THREAD_TIMEOUT )
                                {                                    
                                    daemon_threads.removeFirst();//移除超时节点                                    
                                    thread.retire();//被移除节点信号量释放
                                    
                                }
                                else
                                {                                
                                    break;
                                }
                            }
                        }                
                        if ( daemon_threads.size() >= MAX_RETAINED )
                        {                        
                            return;
                        }
                        daemon_threads.addLast( this );//将此守护节点加入守护线程链表尾部,这样头部应该是最先超时的节点
                        setName( "AEThead2:parked[" + daemon_threads.size() + "]" );                        
                        // System.out.println( "AEThread2: queue=" + daemon_threads.size() + ",creates=" + total_creates + ",starts=" + total_starts );
                    }                    
                    sem.reserve();//新加入节点信号量增加                    
                    if ( target == null )
                    {                        
                        break;
                    }
                }
            }
        }
        
        protected void start(AEThread2    _target,String        _name )
        {
            target    = _target;            
            setName( _name );            
            if ( sem == null )
            {                
                 sem = new AESemaphore( "AEThread2" );                 
                 super.start();//启动线程             
            }
            else
            {                
                sem.release();
            }
        }
        
        protected void retire()
        {            
            sem.release();
        }
    }
    
    public void join()
    {//等待线程结束
        JoinLock currentLock = lock;
        // sync lock will be blocked by the thread        
        synchronized( currentLock )
        {            
            // wait in case the thread is not running yet            
            while (!currentLock.released )
            {//若锁还未释放,则说明线程还在运行,则继续等待锁的释放,                
                try
                {
                    currentLock.wait();                    
                }
                catch( InterruptedException e ){}
            }
        }
    }
}
复制代码



本文转自Phinecos(洞庭散人)博客园博客,原文链接:http://www.cnblogs.com/phinecos/archive/2009/05/07/1451465.html,如需转载请自行联系原作者

目录
相关文章
|
5月前
muduo源码剖析之EventLoopThread
EventLoopThread类包装了一个thread类和一个EventLoop类,(one loop per thread)是封装了一个EventLoop的独立线程。
41 0
|
5月前
|
Java
muduo源码剖析之EventLoopThreadPool
EventLoopThreadPool是EventLoopThread类的线程池类封装了若干个EventLoopThread的线程池,所有者是一个外部的EventLoop。
33 0
|
存储 机器学习/深度学习 算法
源码剖析之ConcurrentHashMap
​ JDK8中ConcurrentHashMap的结构是:数组+链表+红黑树。 ​ 因为在hash冲突严重的情况下,链表的查询效率是O(n),所以jdk8中改成了单个链表的个数大于8时,数组长度小于64就扩容,数组长度大于等于64,则链表会转换为红黑树,这样以空间换时间,查询效率会变O(nlogn)。 ​ 红黑树在Node数组内部存储的不是一个TreeNode对象,而是一个TreeBin对象,TreeBin内部维持着一个红黑树。 ​ 在JDK8中ConcurrentHashMap最经点的实现是使用CAS+synchronized+volatile 来保证并发安全
107 0
源码剖析之ConcurrentHashMap
非公平锁实现原理+源码解读
非公平锁实现原理+源码解读
|
存储 SQL 分布式计算
【源码解读】| LiveListenerBus源码解读(下)
【源码解读】| LiveListenerBus源码解读
153 0
【源码解读】| LiveListenerBus源码解读(下)
|
缓存 分布式计算 监控
【源码解读】| LiveListenerBus源码解读(上)
【源码解读】| LiveListenerBus源码解读
164 0
【源码解读】| LiveListenerBus源码解读(上)