Java多线程案例-Java多线程(3)

简介: Java多线程案例-Java多线程(3)

单例模式

       单例模式是常见的设计模式之一, 那什么是设计模式呢?

设计模式 : 设计模式好比象棋中的 "棋谱". 红方当头炮, 黑方马来跳. 针对红方的一些走法, 黑方应招的时候有 一些固定的套路. 按照套路来走局势就不会吃亏. 软件开发中也有很多常见的 "问题场景". 针对这些问题场景, 大佬们总结出了一些固定的套路. 按照 这个套路来实现代码, 也不会吃亏.

       单例模式可以保证某个类在程序中只存在唯一一份实例对象.

单例模式的具体实现分为"饿汉" 和 "懒汉" 两种

       饿汉模式

       也就是在加载类的时候, 就生成一个类的实例

class Singleton {
    // 唯一实例
    private static Singleton instance = new Singleton();
 
    // 禁止对外new 实例对象
    private Singleton() {};
 
    // 获取实例对象的方法
    public static Singleton getInstance() {
        return instance;
    }
}

Singleton类中的instance被static修饰, 也就是说, 这个instance目前归Singleton所有, 而不是单个Singleton的对象,instance属于Singleton这个类, 而不属于任何类对象.

这里将无参构造器使用private的方法给禁止调用, 也就无法调用无参构造方法来进行创建多个实例:

此处, 在类的内部就将实例创建好了, 同时禁止外部创建实例, 这样就可以保证单例的特性了.

例如:

public class Main {
    public static void main(String[] args) {
        Singleton test1 = Singleton.getInstance();
        Singleton test2 = Singleton.getInstance();
        System.out.println(test2 == test1);
    }
}

这个静态字段instance在这个Singleton类加载的时候就已经生成好了.

此处的return instance仅仅只是读取操作, 还没有涉及到修改操作, 所以饿汉模式总是线程安全的

       懒汉模式

               单线程版

class Singleton {
    private static Singleton instance = null;
    private Singleton() {};
 
    public static Singleton getInstance() {
        if (instance == null) {
            instance = new Singleton();
        }
        return instance;
    }
}

       懒汉模式只有在调用的时候才会新建对象. 这里同样使用private修饰无参构造方法来进制new实例对象, 同时提供getInstance方法来获取这个类的唯一实例, 此处的instance是默认null的, 只有在调用getInstance的时候才会创建唯一实例.

                多线程版

       但如果是多个线程一起调用, 这种情况下他真的也只会生成一个对象吗, 对比于饿汉模式, 饿汉模式值存在return instance 的读取操作, 他的唯一实例是在创建这个类的时候就已经生成好了, 并不需要写入操作, 而我们的懒汉模式, 存在读: instance == null 和 写: instance = new Singleton()的操作. 也就是说, 懒汉模式是线程不安全的, 在多线程下, 就有可能会new出多个对象来.

       但是也有人会问, 不就是new处一个对象, 这个影响应该不大呀, 不能这样想, 假设我们的实例对象如果有100G的大小, 或者是1T呢, 线程1多new出一个, 线程2多new出一个1T, 那么就是妥妥的消耗计算机内存资源.

       那么如何解决线程安全呢? 无非就是给这些操作进行加锁, 然后给instance字段加上volatile关键字:

class Singleton {
    private static volatile Singleton instance = null;
    private Singleton() {};
 
    public static Singleton getInstance() {
        if (instance == null){
            synchronized (Singleton.class) {
                if (instance == null) {
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}

为什么这里面的instance要判断两次?

       这两个if判断看似一样, 但是实际上他们两个执行的动机差别很大, if中间间隔了一个synchronized, 但是加锁会导致锁竞争从而会让线程阻塞等待

       例如: 当线程1 和线程2 都执行到了第一个if判断语句的时候, 这个时候, 两个线程都读取到了instance == null 为true, 于是就都开始往下执行, 此时遇到sychronized就会发生锁竞争, 此时假设线程1 先拿到锁, 于是线程1就将这个唯一实例给new出来了, 然后解锁, 随后线程2又拿到了这个锁, 但是由于之前也是读取到的instance == null为true, 如果没有遇到第二个if判断instance是否为空, 那么就会直接new对象, 生成两个实例对象, 也就不满足线程安全了.

       这样做的好处是, 加锁和解锁其实是一个开销比较高的事情, 而这种懒汉模式的线程不安全也只是发生在首次创建唯一实例的时候, 后续就不需要继续加锁了.

为什么要加valotile?

       进制指令重排序, 保证字段instance 的内存可见性. 例如在线程1修改了instance之后, 需要立马对通知其他线程, instance已经被修改, 此时就不需要去继续锁竞争.

阻塞队列

       什么是阻塞队列?

       阻塞队列是一种特殊的队列, 他也是先进先出, 但是他在这个先进先出的基础上增加了阻塞等待的功能, 它具有如下的特性:

  1. 当队列为空的时候就会阻塞等待, 直到有新的元素入列, 才会结束阻塞
  2. 当队列为满的时候就会阻塞等待, 直到有元素出列, 才会结束阻塞

这种阻塞原理其实是一个典型的生产消费模型, 也就是通过一个容器来解决两个生产者和消费者之间的强耦合关系

如下:  消费者发出100个请求, 但是生产者就只能同时处理一个请求.

现在如果消费者有1000个请求, 但是生产者因为请求量过大而'挂掉了, 这就会直接影响到生产者, 导致生产者也'挂了',  两边的相关性太大, 一个被影响, 能直接影响到另外一个线程, 这就叫做高耦合.

但是我们在他两之间加上一个阻塞队列呢, 让生产者能够按顺序, 一个一个的去处理消费者的请求, 那事情不就得到了解决:

        这个阻塞队列就相当于一个缓冲区, 平衡了生产者和消费者之间的处理差. 就比如在各大电商平台上都会有秒杀的活动, 这个时候 如果短时间内有大量请求, 如果没有这个阻塞队列, 服务器直接对这些请求进行处理, 很可能就因为处理量巨大而到时服务器挂掉了, 而在其中加一个阻塞队列就相当于一个缓冲区, 将这些请求都放入缓冲区让线程来慢慢处理. 这就可以防止服务器突然被一波请求给直接冲垮.

       同时如果消费者这边'挂了', 也不会直接影响到生产者这边, 反过来生产者同样如此.

       标准库中的阻塞队列

       在java的标准库中内置了阻塞队列, 如果有需要就可以直接引用:

BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>();

特性:

  1. BlockingQueue是一个接口, 真正的实现类是LinkedBlockingQueue,
  2. put和take方法是阻塞队列的常用方法, 其中put是用于入队列, take是出队列
  3. BlockingQueue也有offer, poll, peek等方法, 但是这些方法都不带则色特性

一个简单的例子:

public class ThreadDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>();
        Thread customer = new Thread(()->{
            while(true) {
                int value;
                try {
                    value = blockingDeque.take();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("消费元素: " + value);
            }
        }, "消费者");
        customer.start();
 
        Thread producer = new Thread(()->{
            Random random = new Random();
            while (true) {
                int num = random.nextInt(100);
                System.out.println("生产元素: " + num );
                try {
                    blockingDeque.put(num);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        },"生产者");
        producer.start();
 
        customer.join();
        producer.join();
    }
}

       阻塞队列实现

       我们说, 阻塞队列其实也是队列, 只不过是在队列的基础上增加了阻塞的功能:

       增加synchronized进行加锁控制, 使用size标记法实现循环队列.

class MyBlockingQueue {
    private int[] items = new int[1000];
    volatile private int head = 0;
    volatile private int tail = 0;
    volatile private int size = 0;
 
    synchronized public void put(int val) throws InterruptedException {
        if (size == items.length) {
            // 如果队列满了, 就必须阻塞等待
            this.wait();
        }
        items[tail] = val;
        tail++;
        if (tail == items.length) {
            tail = 0;
        }
        size++;
        this.notify();
    }
 
    synchronized public Integer take() throws InterruptedException {
        if (size == 0) {
            this.wait();
        }
        int value = items[head];
        head++;
        if (head == items.length) {
            head = 0;
        }
        size--;
        this.notify();
        return value;
    }
}

理解:

    public void put(int value) throws InterruptedException {
        synchronized (this) {
            // 此处最好使用 while.
            // 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
            // 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能又已经队列满了
            // 就只能继续等待
            while (size == items.length) {
                wait();
           }
            items[tail] = value;
            tail = (tail + 1) % items.length;
            size++;
            notifyAll();
       }
   }

       这个put方法, 对立面的内容进行加锁操作, 此时, 每个对象对这个队列进行在put操作的时候都会加锁,  如果队列满了就会进入wait阻塞.   同时, 在放入元素的时候, 解除队列为空的阻塞状态

这里最好是使用while循环来控制wait, 因为在使用notifyAll时回打开所有此所对象的线程, 也就是会被唤醒, 但是这个时候size == items.length ,仍然需要继续等待wait.

定时器

       什么是定时器? 定时器是软件开发中的一个重要组件, 类似于一个闹钟, 达到指定的时间后就会执行某个特定的代码.

       标准库中的定时器

       在java标准库中提供了一个Timer类, 其核心方法为schedule

schedule方法中包含了两个参数, 第一个是指定即将要执行的代码, 第二个参数是, 指定多长时间后,执行.

例如:

        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("hello world");
            }
        });

       定时器的实现

定时器的构成:

  • 一个优先级阻塞队列:  阻塞队列中的任务都有各自的执行时刻, 最先执行的任务一定是设定时间最短的(delay最小的), 使用带优先级队列就可以高效的把这个delay最小的任务找出来.
  • 队列中每一个元素是一个TimerTask对象
  • Task中带有一个时间属性, 队首元素就是即将被执行的元素
  • 同时有一个worker线程扫描这个队列队首元素, 看这个队首元素是否需要执行

(1) Timer类提供的核心接口为schedule, 用于注册一个任务, 然后指定多长时间后执行

public class Timer {
    public void schedule(Runnable command, long after) {
 // TODO
   }
}

(2) Task类用来描述一个任务(作为Timer的内部类) , 里面包含一个Runnable对象和一个time(毫秒时间戳)

static class Task implements Comparable<Task> {
        private Runnable command;
        private long time;
        public Task(Runnable command, long time) {
            this.command = command;
            // time 中存的是绝对时间, 超过这个时间的任务就应该被执行
            this.time = System.currentTimeMillis() + time;
       }
        public void run() {
            command.run();
       }
        @Override
        public int compareTo(Task o) {
            // 谁的时间小谁排前面
            return (int)(time - o.time);
       }
   }
}

(3) Timer实例中, 核心数据结构为PriorityBlockingQueue(优先级阻塞队列, 提供take和put方法, take获取队首元素), 然后通过schedule来往里面插入数据

public Mythimer() {
        Thread t= new Thread(()->{
           while (true) {
               try {
                    synchronized (locker){
                       MyTask myTask = queue.take();
                       long currentTime = System.currentTimeMillis();
                       if (myTask.time <= currentTime) {
                           // 时间到了, 执行任务
                           myTask.runnable.run();
                       } else {
                           // 时间还没到
                           // 把取出的任务塞回去
                           queue.put(myTask);
                           locker.wait(currentTime - myTask.time);
                       }
                   }
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           }
        });
 
    }
 

当使用MyTimer mytimer = new MyTimer(); 调用这个无参构造方法的时候, 会创建一个线程Thread t来扫描这个PriorityBlockingQueue, 每次都拿出队首元素, 也就是:

MyTask myTask = queue.take();
long currentTime = System.currentTimeMillis();

拿出来之后if判断这个队首元素是否需要被执行, 通过设定的时间与当前的计算机系统时间来比较判断. 如果到了就调用run方法执行, 否则就将这个任务塞回优先级阻塞队列, 然后进入wait阻塞等待, 等待时间为这个被放进去的任务的将要被执行的等待时间.

这里的while(true), 是一个死循环, 他的执行速度非常的块, 也会占用系统资源, 计算机每秒访问这个队列很多次, 但是队首元素仍然在等待时间.

public void schedule(Runnable runnable, long delay) {
        MyTask myTask = new MyTask(runnable,delay);
        queue.put(myTask);
        synchronized (locker) {
            locker.notify();
        }
    }

但是, 如果有其他的将会被更早的执行的任务插入插入队列的话, 那么之前的队首元素就不是最先执行的, 但是现在仍然在wait等待, 也就是现在需要使用notify将其唤醒. 然后重新从优先级阻塞队列里面取到剩余时间最短的任务.

       完整代码

import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.concurrent.PriorityBlockingQueue;
 
class MyTimer  {
    // 带有优先级的阻塞队列 (核心数据结构)
    private PriorityBlockingQueue<MyTask> queue = new PriorityBlockingQueue<>(); // 这里的元素是什么呢?
 
    // 此处的delay是一个形如3000 这样的数字()
    public void schedule(Runnable runnable, long delay) {
        synchronized (locker) {
            MyTask myTask = new MyTask(runnable,delay);
            queue.put(myTask);
            locker.notify();
        }
    }
 
    // 构造线程执行任务
    public MyTimer() {
        Thread t= new Thread(()->{
           while (true) {
               try {
                    synchronized (locker){
                        while (queue.isEmpty()) {
                            locker.wait();
                        }
                       MyTask myTask = queue.take();
                       long currentTime = System.currentTimeMillis();
                       if (myTask.time <= currentTime) {
                           // 时间到了, 执行任务
                           myTask.runnable.run();
                       } else {
                           // 时间还没到
                           // 把取出的任务塞回去
                           queue.put(myTask);
                           locker.wait(myTask.time - currentTime);
                       }
                   }
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           }
        });
        t.start();
    }
 
    // 创建锁对象
    private Object locker= new Object();
 
}
 
// 创建一个类, 表示两方面的信息
// 1.执行的任务
// 2.任务什么时候执行
 
class MyTask implements Comparable<MyTask>{
    // Runnable实现类
    public Runnable runnable;
    // 什么时间点执行(实际执行时间)
    public long time;
 
    public MyTask(Runnable runnable, long delay) {
        this.runnable = runnable;
        this.time = System.currentTimeMillis() + delay;
    }
 
    @Override
    public int compareTo(MyTask o) {
        return (int) (this.time - o.time);
    }
}
 
public class TestDemoMyTimer {
    public static void main(String[] args) {
        MyTimer myTimer = new MyTimer();
        myTimer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("hello4");
            }
        }, 4000);
        myTimer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("hello3");
            }
        }, 3000);
        myTimer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("hello2");
            }
        }, 2000);
        myTimer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("hello1");
            }
        }, 1000);
 
        System.out.println("hello0");
    }
}

线程池

       解释

下面解释引自百度:

       线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

       线程池最大的好处就是减少每次创建\ 销毁线程的消耗 : 线程的过多会带来系统线程调度的开销, 进而影响局部或者整体性的性能, 但是线程池维护着多个线程, 等待线程管理者分配可执行的任务. 这就避免了在处理短时间任务时创建与销毁线程的代价.

       Java标准库中的线程池

               Executors

1.使用工厂方法: Excurtors.newFixedThreadPool(n)

返回值为ExecutorService , 创建出包含nThreads个固定大小线程的线程池.

2.使用: Executors.newCachedThreadPool()

创建动态大小的线程池, 不会设置固定值, 按需创建, 用完之后也不会销毁, 留着以后备用

3. newSingleThreadExecutor 创建只包含单个线程的线程池

4. newScheduledThreadpool 设定 延迟时间后执行命令, 或者定期执行命令

               ExecutorService

使用pool.submit(Runnable runnable) 来注册传入一个线程到线程池:

public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(10);
        pool.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("hello !!!");
            }
        });
 
    }

               ThreadPoolExecutor

Executors本质上是一个ThreadPoolExecutor类的封装, ThreadPoolExecutor提供了更多的可选参数, 可以进一步细化操作.

ThreadPoolExecutor构造方法:

 

ThreadPoolExecutor(
int corePoolSize,
 int maximumPoolSize,
 long keepAliveTime,
 TimeUnit unit,
 BlockingQueue<Runnable> workQueue,
 RejectedExecutionHandler handler
) 
  • corePoolSize为核心线程数,
  • maximumPoolSize是最大线程数:
    如果当前任务比较多, 线程池就会多创建一些'临时线程', 如果当前任务比较少, 线程池就销毁一些临时线程
  • keepAliveTime 临时线程的存活时间:
    当当前任务比较少的时候, 临时线程是不会被立即销毁的, 它不会立即销毁, 而是等待这个         keepAlivePool最大存活时间.
  • unit 时间单位(s,min,ms)
  • workQueue 阻塞队列 :
    线程池里面有很多任务, 也是通过阻塞队列的形式来管理的, 可以通过手动指定一个workQueue阻塞队列给线程池, 此时就能很方便的控制里面的线程
  • handler 拒绝策略
    线程池的拒绝策略, 如果线程池满了, 继续往里面添加策略, 该如何拒绝

       标准库提供的拒绝策略

  • ThreadPoolExecutor.AbortPolicy: 如果满了, 继续添加任务, 添加操作直接抛出异常
  • ThreadPoolExecutor.CallerRunsPolicy: 添加的线程自己负责执行这个任务
  • ThreadPoolExecutor.DiscardOldestPolicy: 丢弃最老的任务
  • ThreadPoolExecutor.DiscardPolicy : 丢弃最新的任务

最老任务最新任务: 最老任务, 也就是管理线程的阻塞队列的队首元素, 不执行了, 就可以直接删除了.

       线程池的实现

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
 
public class MyThreadPool {
    // 阻塞队列用来存放任务
    private BlockingDeque<Runnable> queue = new LinkedBlockingDeque<>();
 
    // 提交任务
    public void submit(Runnable runnable) throws InterruptedException {
        queue.put(runnable);
    }
 
    public MyThreadPool(int n) {
        for (int i = 0; i < n; i++) {
            Thread t = new Thread(()-> {
                try {
                    // 不断的取元素
                    while (true) {
                        Runnable runnable = queue.take();
                        runnable.run();
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
            t.start();
        }
    }
 
    public static void main(String[] args) throws InterruptedException {
        MyThreadPool pool = new MyThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            int lam = i;
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("hello: " + lam);
                }
            });
        }
    }
 
}
  • 使用构造方法, 创建10个线程, 来处理BlockingQueue里面的Runnable任务, 如果队列为空. BlockingQueue自动阻塞等待
  • 其中的int lam是关于lambda表达式的变量捕获, 由于i在不断的修改, 并不能直接捕获, 所以需要赋值给一个每次创建都是 新的变量, 且在生命周期结束之前都没有被修改的变量lam
  • n的值的设计, 这里面的n的值并不是越多越好,  不同的程序, 需要的线程数是不一样的, 最合适的大小需要通过测试来鉴定


目录
相关文章
|
2天前
|
缓存 NoSQL Java
Java高并发实战:利用线程池和Redis实现高效数据入库
Java高并发实战:利用线程池和Redis实现高效数据入库
12 0
|
2天前
|
搜索推荐 Java 程序员
【案例解析】从菜鸟到高手,如何优雅地使用Java条件语句?
【6月更文挑战第14天】本文介绍了Java编程中条件语句的重要性,特别是if-else和switch的使用。通过四个案例,展示了如何优雅地运用这些语句:案例一演示了基础if-else用于用户登录响应;案例二利用switch处理枚举类型,如订单状态;案例三展示了条件语句的嵌套与组合,用于游戏评分系统;案例四探讨了代码优化与重构,减少冗长的if-else结构。文章强调,掌握条件语句能提升编码效率和代码质量,鼓励开发者不断实践以写出高效优雅的代码。
|
2天前
|
监控 Java API
Java 程序设计 第八章 线程
Java 程序设计 第八章 线程
|
2天前
|
存储 安全 Java
Java多线程编程--JUC
Java多线程编程
|
3天前
|
Java API
详细探究Java多线程的线程状态变化
Java多线程的线程状态主要有六种:新建(NEW)、可运行(RUNNABLE)、阻塞(BLOCKED)、等待(WAITING)、超时等待(TIMED_WAITING)和终止(TERMINATED)。线程创建后处于NEW状态,调用start()后进入RUNNABLE状态,表示准备好运行。当线程获得CPU资源,开始执行run()方法时,它处于运行状态。线程可以因等待锁或调用sleep()等方法进入BLOCKED或等待状态。线程完成任务或发生异常后,会进入TERMINATED状态。
|
3天前
|
存储 安全 Java
Java多线程中线程安全问题
Java多线程中的线程安全问题主要涉及多线程环境下对共享资源的访问可能导致的数据损坏或不一致。线程安全的核心在于确保在多线程调度顺序不确定的情况下,代码的执行结果始终正确。常见原因包括线程调度随机性、共享数据修改以及原子性问题。解决线程安全问题通常需要采用同步机制,如使用synchronized关键字或Lock接口,以确保同一时间只有一个线程能够访问特定资源,从而保持数据的一致性和正确性。
|
Java
Java中需要注意的一些案例
Java中需要注意的一些案例
103 0
|
4天前
|
安全 Java API
Java并发基础-启动和终止线程
Java并发基础-启动和终止线程
13 0
|
4天前
|
Java 调度
Java并发基础-线程简介(状态、常用方法)
Java并发基础-线程简介(状态、常用方法)
11 0