高并发编程-自定义简易的线程池(1),体会原理

简介: 高并发编程-自定义简易的线程池(1),体会原理

20191031000734676.png

概述


我们工作中,并发编程必然离不开jdk提供的j.u.c中的线程池 ,假设让我们自己去设计一个线程池,该从哪几个方面来着手考虑呢?


首先: 既然是线程池 , 那必然 有个初始化的线程数量 和 最大数量 ----> 两个属性 : init 和 max


其次:当线程池中的线程达到了 init 数量,但还没到max 数量的时候,将任务放入任务队列中,而不是直接开辟新的线程,是不是一种更好的设计? 所以 引入 任务队列,存放任务


最后:当线程池中的线程 把 任务队列也塞满了,也达到了Max的值,新的线程过来,我们改如何处理呢? 是不是指定拒绝策略更合适呢? 比如 直接拒绝,抛出异常,放入临时队列等等。所以 拒绝策略也是需要的


总结一下


  1. init (coreSize) / max , 线程池初始化大小和最大线程数
  2. 任务队列
  3. 拒绝策略
  4. 。。。


示例

逐步实现该需求, 先把基本的流程调通,这里我们先实现定义线程池数量的功能 ,代码如下

package com.artisan.test;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.IntStream;
public class SimpleThreadPool {
    // 线程数
    private int size;
    // 默认线程数
    private static int DEFAULT_SIZE = 10;
    // 任务队列
    private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList();
    // 工作线程集合
    private final static List<WorkTask> THREAD_QUEUE = new ArrayList<>();
    private static volatile int seq = 0;
    private static final ThreadGroup GROUP = new ThreadGroup("Pool_Group");
    private static final String THREAD_PREFIX = "SIMPLE_THREAD_POOL-";
    /**
     * 默认构造函数
     * 默认10个线程
     */
    public SimpleThreadPool() {
        this(DEFAULT_SIZE);
    }
    /**
     * 线程池中的线程数量
     *
     * @param size
     */
    public SimpleThreadPool(int size) {
        this.size = size;
        // 初始化
        init();
    }
    /**
     * 初始化
     */
    private void init() {
        for (int i = 0; i < size; i++) {
            createWorkTask();
        }
    }
    /**
     * 创建工作线程
     */
    private void createWorkTask() {
        // 创建工作线程
        WorkTask workTask = new WorkTask(GROUP, THREAD_PREFIX + (seq++));
        // 启动
        workTask.start();
        // 加入到工作线程集合
        THREAD_QUEUE.add(workTask);
    }
    /**
     * 将任务runnable 提交到 TASK_QUEUE
     *
     * @param runnable
     */
    public void submit(Runnable runnable) {
        synchronized (TASK_QUEUE) {
            // 添加到队列的最后面
            TASK_QUEUE.addLast(runnable);
            // 唤醒所有正在等待的线程
            TASK_QUEUE.notifyAll();
        }
    }
    /**
     * 线程状态
     */
    enum TaskState {
        FREE,
        RUNNING,
        BLOCKED,
        DEAD
    }
    /**
     * private 私有类
     * 继承Thread 的 工作线程
     */
    private static class WorkTask extends Thread {
        // 默认状态  FREE
        private volatile TaskState state = TaskState.FREE;
        public WorkTask(ThreadGroup threadGroup, String name) {
            super(threadGroup, name);
        }
        /**
         * 获取线程状态
         *
         * @return
         */
        public TaskState getTaskState() {
            return this.state;
        }
        /**
         * 重写run方法
         */
        @Override
        public void run() {
            // 跳出while的标识
            OUTER:
            // 只要不是DEAD 状态,一直从队列中获取消息
            while (state != TaskState.DEAD) {
                Runnable runnable;
                // 对队列进行加锁操作
                synchronized (TASK_QUEUE) {
                    // 如果队列为空,wait
                    while (TASK_QUEUE.isEmpty()) {
                        try {
                            // 更改状态
                            state = TaskState.BLOCKED;
                            // wait 放弃该锁
                            TASK_QUEUE.wait();
                        } catch (InterruptedException e) {
                            // 如果有个线程在wait,终止该线程会被InterruptedException异常捕获,
                            // break OUTER 跳出while循环,从而达到终止这个线程
                            break OUTER;
                        }
                    }
                    // TASK_QUEUE不为空 从队列中获取一个Runnable对象
                    // Removes and returns the first element from this list
                    runnable = TASK_QUEUE.removeFirst();
                }
                if (runnable != null) {
                    // 运行状态
                    state = TaskState.RUNNING;
                    // 运行
                    runnable.run();
                    // 运行结束后 置为FREE
                    state = TaskState.FREE;
                }
            }
        }
        /**
         * 关闭线程
         */
        public void close() {
            this.state = TaskState.DEAD;
        }
    }
    /**
     * 测试
     *
     * @param args
     */
    public static void main(String[] args) {
        SimpleThreadPool simpleThreadPool = new SimpleThreadPool();
//        for (int i = 0; i < 50; i++) {
//            simpleThreadPool.submit(()->{
//                try {
//                    System.out.println(Thread.currentThread().getName() + " is running" );
//                    Thread.sleep(1_000);
//                    System.out.println(Thread.currentThread().getName() + " finished" );
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//            });
//        }
        IntStream.range(0, 50)
                .forEach(i ->
                        simpleThreadPool.submit(() -> {
                            try {
                                System.out.println(Thread.currentThread().getName() +  "____" + i +  " start");
                                Thread.sleep(1_000);
                                System.out.println(Thread.currentThread().getName() + "____" + i +" finished");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }));
    }
}


运行:

SIMPLE_THREAD_POOL-9____0 start
SIMPLE_THREAD_POOL-4____5 start
SIMPLE_THREAD_POOL-0____9 start
SIMPLE_THREAD_POOL-5____4 start
SIMPLE_THREAD_POOL-6____3 start
SIMPLE_THREAD_POOL-8____1 start
SIMPLE_THREAD_POOL-7____2 start
SIMPLE_THREAD_POOL-1____8 start
SIMPLE_THREAD_POOL-3____6 start
SIMPLE_THREAD_POOL-2____7 start
SIMPLE_THREAD_POOL-4____5 finished
SIMPLE_THREAD_POOL-6____3 finished
SIMPLE_THREAD_POOL-5____4 finished
SIMPLE_THREAD_POOL-3____6 finished
SIMPLE_THREAD_POOL-0____9 finished
SIMPLE_THREAD_POOL-9____0 finished
SIMPLE_THREAD_POOL-0____13 start
SIMPLE_THREAD_POOL-2____7 finished
SIMPLE_THREAD_POOL-5____12 start
SIMPLE_THREAD_POOL-1____8 finished
SIMPLE_THREAD_POOL-6____11 start
SIMPLE_THREAD_POOL-7____2 finished
SIMPLE_THREAD_POOL-4____10 start
SIMPLE_THREAD_POOL-8____1 finished
SIMPLE_THREAD_POOL-7____17 start
SIMPLE_THREAD_POOL-1____16 start
SIMPLE_THREAD_POOL-2____15 start
SIMPLE_THREAD_POOL-9____14 start
SIMPLE_THREAD_POOL-8____18 start
SIMPLE_THREAD_POOL-3____19 start
SIMPLE_THREAD_POOL-0____13 finished
SIMPLE_THREAD_POOL-0____20 start
SIMPLE_THREAD_POOL-5____12 finished
SIMPLE_THREAD_POOL-5____21 start
SIMPLE_THREAD_POOL-6____11 finished
SIMPLE_THREAD_POOL-6____22 start
SIMPLE_THREAD_POOL-4____10 finished
SIMPLE_THREAD_POOL-4____23 start
SIMPLE_THREAD_POOL-7____17 finished
SIMPLE_THREAD_POOL-7____24 start
SIMPLE_THREAD_POOL-1____16 finished
SIMPLE_THREAD_POOL-1____25 start
SIMPLE_THREAD_POOL-2____15 finished
SIMPLE_THREAD_POOL-2____26 start
SIMPLE_THREAD_POOL-9____14 finished
SIMPLE_THREAD_POOL-9____27 start
SIMPLE_THREAD_POOL-8____18 finished
SIMPLE_THREAD_POOL-8____28 start
SIMPLE_THREAD_POOL-3____19 finished
SIMPLE_THREAD_POOL-3____29 start
SIMPLE_THREAD_POOL-5____21 finished
SIMPLE_THREAD_POOL-2____26 finished
SIMPLE_THREAD_POOL-1____25 finished
SIMPLE_THREAD_POOL-7____24 finished
SIMPLE_THREAD_POOL-4____23 finished
SIMPLE_THREAD_POOL-0____20 finished
SIMPLE_THREAD_POOL-6____22 finished
SIMPLE_THREAD_POOL-0____35 start
SIMPLE_THREAD_POOL-4____34 start
SIMPLE_THREAD_POOL-7____33 start
SIMPLE_THREAD_POOL-1____32 start
SIMPLE_THREAD_POOL-8____28 finished
SIMPLE_THREAD_POOL-3____29 finished
SIMPLE_THREAD_POOL-2____31 start
SIMPLE_THREAD_POOL-9____27 finished
SIMPLE_THREAD_POOL-5____30 start
SIMPLE_THREAD_POOL-9____39 start
SIMPLE_THREAD_POOL-3____38 start
SIMPLE_THREAD_POOL-8____37 start
SIMPLE_THREAD_POOL-6____36 start
SIMPLE_THREAD_POOL-8____37 finished
SIMPLE_THREAD_POOL-2____31 finished
SIMPLE_THREAD_POOL-5____30 finished
SIMPLE_THREAD_POOL-3____38 finished
SIMPLE_THREAD_POOL-9____39 finished
SIMPLE_THREAD_POOL-6____36 finished
SIMPLE_THREAD_POOL-9____44 start
SIMPLE_THREAD_POOL-0____35 finished
SIMPLE_THREAD_POOL-4____34 finished
SIMPLE_THREAD_POOL-3____43 start
SIMPLE_THREAD_POOL-4____47 start
SIMPLE_THREAD_POOL-7____33 finished
SIMPLE_THREAD_POOL-7____48 start
SIMPLE_THREAD_POOL-5____42 start
SIMPLE_THREAD_POOL-1____32 finished
SIMPLE_THREAD_POOL-2____41 start
SIMPLE_THREAD_POOL-8____40 start
SIMPLE_THREAD_POOL-1____49 start
SIMPLE_THREAD_POOL-0____46 start
SIMPLE_THREAD_POOL-6____45 start
SIMPLE_THREAD_POOL-3____43 finished
SIMPLE_THREAD_POOL-5____42 finished
SIMPLE_THREAD_POOL-7____48 finished
SIMPLE_THREAD_POOL-4____47 finished
SIMPLE_THREAD_POOL-9____44 finished
SIMPLE_THREAD_POOL-6____45 finished
SIMPLE_THREAD_POOL-0____46 finished
SIMPLE_THREAD_POOL-1____49 finished
SIMPLE_THREAD_POOL-8____40 finished
SIMPLE_THREAD_POOL-2____41 finished


可知: 虽然定义了50个任务,但是根据日志来看,是由10个线程来完成的,符合预期。


相关文章
|
2月前
|
Java
如何在Java中进行多线程编程
Java多线程编程常用方式包括:继承Thread类、实现Runnable接口、Callable接口(可返回结果)及使用线程池。推荐线程池以提升性能,避免频繁创建线程。结合同步与通信机制,可有效管理并发任务。
152 6
|
5月前
|
Java API 微服务
为什么虚拟线程将改变Java并发编程?
为什么虚拟线程将改变Java并发编程?
306 83
|
2月前
|
存储 缓存 监控
什么是线程池?它的工作原理?
我是小假 期待与你的下一次相遇 ~
189 1
|
2月前
|
Java 调度 数据库
Python threading模块:多线程编程的实战指南
本文深入讲解Python多线程编程,涵盖threading模块的核心用法:线程创建、生命周期、同步机制(锁、信号量、条件变量)、线程通信(队列)、守护线程与线程池应用。结合实战案例,如多线程下载器,帮助开发者提升程序并发性能,适用于I/O密集型任务处理。
271 0
|
3月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
239 16
|
3月前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案
|
4月前
|
数据采集 消息中间件 并行计算
Python多线程与多进程性能对比:从原理到实战的深度解析
在Python编程中,多线程与多进程是提升并发性能的关键手段。本文通过实验数据、代码示例和通俗比喻,深入解析两者在不同任务类型下的性能表现,帮助开发者科学选择并发策略,优化程序效率。
286 1
|
5月前
|
数据采集 监控 网络协议
基于aiohttp的高并发爬虫实战:从原理到代码的完整指南
在数据驱动时代,传统同步爬虫效率低下,而基于Python的aiohttp库可构建高并发异步爬虫。本文通过实战案例解析aiohttp的核心组件与优化策略,包括信号量控制、连接池复用、异常处理等,并探讨代理集成、分布式架构及反爬应对方案,助你打造高性能、稳定可靠的网络爬虫系统。
336 0
|
6月前
|
数据采集 网络协议 前端开发
Python多线程爬虫模板:从原理到实战的完整指南
多线程爬虫通过并发请求大幅提升数据采集效率,适用于大规模网页抓取。本文详解其原理与实现,涵盖任务队列、线程池、会话保持、异常处理、反爬对抗等核心技术,并提供可扩展的Python模板代码,助力高效稳定的数据采集实践。
281 0
|
存储 NoSQL Java
探索Java分布式锁:在高并发环境下的同步访问实现与优化
【6月更文挑战第30天】Java分布式锁在高并发下确保数据一致性,通过Redis的SETNX、ZooKeeper的临时节点、数据库操作等方式实现。优化策略包括锁超时重试、续期、公平性及性能提升,关键在于平衡同步与效率,适应大规模分布式系统的需求。
470 1

热门文章

最新文章