高并发编程-自定义带有超时功能的锁

简介: 高并发编程-自定义带有超时功能的锁

20191031000606569.png


概述



20191013201148857.png


我们知道synchronized的机制有一个很重要的特点是:使用synchronized, 当一个线程获取了锁,其他线程只能一直等待,等待这个获取锁的线程释放锁,如果这个线程执行时间很长,其他线程就需要一直等待 。 除非获取锁的线程执行完了该代码块,释放锁或者线程执行发生异常,JVM会使线程自动释放锁。


当然了J.U.C包中 Doug Lea大神已经设计了非常完美的解决方案,我们这里不讨论J.U.C的实现。


我们自己实现一套的话,该如何实现呢? 有几点需要思考


  1. 原有的synchronized功能,必须保证,即一个线程拿到锁后,其他线程必须等待
  2. 谁加的锁,必须由谁来释放
  3. 加入超时功能

好了,开始吧


步骤

自定义超时异常处理类

既然要设计带超时功能的锁, 少不了当超时时,抛出异常,以便上层捕获处理。

public class TimeOutException extends  RuntimeException {
    public TimeOutException(String message){
        super(message);
    }
}


ILock接口

约定几个接口方法: lock 、lock(long timeout)、unlock、getBlockedThread、getBlockedSize 详见代码注释

package com.artisan.customLock;
import java.util.Collection;
public interface ILock {
    /**
     * 加锁
     */
    void lock() throws InterruptedException;
    /**
     * 加锁
     * @param timeout 持有锁的时间,过了该时间(毫秒) 自动释放该锁
     */
    void lock(long timeout) throws InterruptedException,TimeOutException;
    /**
     * 释放锁
     */
    void unlock();
    /**
     * 用于观察 有哪些线程因没有获取到锁被blocked
     * @return
     */
    Collection<Thread> getBlockedThreads();
    /**
     * 被blocked的线程数量
     * @return
     */
    int getBlockedSize();
}


实现类

详见代码注释。 加锁和释放锁方法 使用 synchronized 修饰,否则使用wait && notifyAll抛出异常


import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
public class CustomLock implements ILock {
    // 默认false
    // true: 已经被线程抢到  false: 空闲
    private boolean lockFlag;
    // 用于存储被blocked的线程,方便查看及计算被blocked的线程数量
    Collection<Thread> blockedThreadCollection = new ArrayList<>();
    /**
     * 构造函数中初始化该lockFlag
     */
    public CustomLock(){
        this.lockFlag = false;
    }
    /**
     * synchronized 修饰该方法
     * @throws InterruptedException
     */
    @Override
    public synchronized void lock() throws InterruptedException {
        // 如果其他线程已经获取到了锁,让该线程wait
        while(lockFlag){
            // 加入到blockedThreadCollection
            blockedThreadCollection.add(Thread.currentThread());
            // wait
            this.wait();
        }
        // 如果空闲,将该monitor置为true
        blockedThreadCollection.remove(Thread.currentThread());
        lockFlag = true;
    }
    @Override
    public void lock(long timeout) throws InterruptedException, TimeOutException {
    }
    @Override
    public synchronized void unlock() {
        // 如果是加锁的线程
            // 将Monitor置为空闲
            this.lockFlag = false;
            Optional.of(Thread.currentThread().getName() + " 释放lock").ifPresent(System.out::println);
            // 唤醒其他正在等待的线程
            this.notifyAll();
    }
    @Override
    public Collection<Thread> getBlockedThreads() {
        // blockedThreadCollection 可能被其他线程add 或者remove,这里定义为不可变的集合类型
        return Collections.unmodifiableCollection(blockedThreadCollection);
    }
    @Override
    public int getBlockedSize() {
        return blockedThreadCollection.size();
    }
}

测试

package com.artisan.customLock;
import java.time.LocalTime;
import java.util.Optional;
import java.util.stream.Stream;
public class CustomLockTest {
    public static void main(String[] args) throws InterruptedException {
        CustomLock customLock = new CustomLock();
        // 开启5个线程
        Stream.of("T1", "T2", "T3", "T4", "T5")
                .forEach(name -> new Thread(() -> {
                    // 加锁 处理业务
                    try {
                        // 加锁
                        customLock.lock();
                        Optional.of(Thread.currentThread().getName() + " hold the Monitor")
                                .ifPresent(System.out::println);
                        // 调用业务
                        work();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        // 在finally中释放锁
                        customLock.unlock();
                    }
                }, name).start());
    }
    /**
     * 模拟线程的业务逻辑
     *
     * @throws InterruptedException
     */
    public static void work() throws InterruptedException {
        Optional.of(Thread.currentThread().getName() +
                " begin to work " + LocalTime.now().withNano(0)).ifPresent(System.out::println);
        Thread.sleep(3_000);
    }
}


日志输出:

"E:\Program Files\Java\jdk1.8.0_161\bin\java" "-javaagent:E:\Program Files\JetBrains\IntelliJ IDEA 2017.2.4\lib\idea_rt.jar=53159:E:\Program Files\JetBrains\IntelliJ IDEA 2017.2.4\bin" -Dfile.encoding=UTF-8 -classpath "E:\Program Files\Java\jdk1.8.0_161\jre\lib\charsets.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\deploy.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\access-bridge-64.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\cldrdata.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\dnsns.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jaccess.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jfxrt.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\localedata.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\nashorn.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunec.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunjce_provider.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunmscapi.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunpkcs11.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\zipfs.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\javaws.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jce.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jfr.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jfxswt.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jsse.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\management-agent.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\plugin.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\resources.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\rt.jar;D:\IdeaProjects\mvc\target\classes" com.artisan.customLock.CustomLockTest
T1 hold the Monitor
T1 begin to work 22:19:14
T1 释放lock
T5 hold the Monitor
T5 begin to work 22:19:17
T5 释放lock
T2 hold the Monitor
T2 begin to work 22:19:20
T2 释放lock
T4 hold the Monitor
T4 begin to work 22:19:23
T4 释放lock
T3 hold the Monitor
T3 begin to work 22:19:26
T3 释放lock
Process finished with exit code 0


可以看到 确实是一个线程拿到锁后,其他线程必须等待 。

针对第二点呢: 谁加的锁,必须由谁来释放 .

我们来测试下

存在的问题

针对第二点呢: 谁加的锁,必须由谁来释放 .

我们来测试下 : 假设我们在main线程中调用了unlock方法


20191013222434177.png


重新运行测试,观察日志

T1 hold the Monitor
T1 begin to work 22:24:41
main 释放lock
T5 hold the Monitor
T5 begin to work 22:24:41
T1 释放lock
T2 hold the Monitor
T2 begin to work 22:24:44
T5 释放lock
T4 hold the Monitor
T4 begin to work 22:24:44
T2 释放lock
T3 hold the Monitor
T3 begin to work 22:24:47
T4 释放lock
T3 释放lock
Process finished with exit code 0


T1拿到锁还没有工作完,就被主线程释放了,结果T5又抢到了… 很明显不对了 。

修复存在的问题

见代码


20191013223246773.png


再次运行测试 ,OK


20191013223344670.png


超时功能

 @Override
    public synchronized  void lock(long timeout) throws InterruptedException, TimeOutException {
        // 入参不合理,直接调用lock ,也可抛出异常
        if (timeout <= 0 ) lock();
        // 线程等待的剩余时间
        long  leftTime = timeout;
        // 计算结束时间
        long endTime = System.currentTimeMillis() + timeout;
        while(lockFlag){
            // 如果超时了,抛出异常
            if (leftTime <= 0){
                throw new TimeOutException(Thread.currentThread().getName() + " 超时...");
            }
            // 加入到blockedThreadCollection
            blockedThreadCollection.add(Thread.currentThread());
            // wait 指定的时间
            this.wait(timeout);
            // 计算是否超时
            leftTime = endTime - System.currentTimeMillis();
        }
        // 如果空闲,将该monitor置为true
        blockedThreadCollection.remove(Thread.currentThread());
        this.lockFlag = true;
        // 将当前线程置为lockHolderThread
        this.lockHolderThread = Thread.currentThread();
    }



测试超时功能

package com.artisan.customLock;
import java.time.LocalTime;
import java.util.Optional;
import java.util.stream.Stream;
public class CustomLockTest {
    public static void main(String[] args) {
        CustomLock customLock = new CustomLock();
        // 开启5个线程
        Stream.of("T1", "T2", "T3", "T4", "T5")
                .forEach(name -> new Thread(() -> {
                    // 加锁 处理业务
                    try {
                        // 加锁 最多等待100毫秒,如果100ms,没抢到则中断执行
                        customLock.lock(100);
                        Optional.of(Thread.currentThread().getName() + " hold the Monitor")
                                .ifPresent(System.out::println);
                        // 调用业务
                        work();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (TimeOutException e){
                        Optional.of(Thread.currentThread().getName() + " timeOut")
                                .ifPresent(System.out::println);
                    }finally {
                        // 在finally中释放锁
                        customLock.unlock();
                    }
                }, name).start());
    }
    /**
     * 模拟线程的业务逻辑
     *
     * @throws InterruptedException
     */
    public static void work() throws InterruptedException {
        Optional.of(Thread.currentThread().getName() +
                " begin to work " + LocalTime.now().withNano(0)).ifPresent(System.out::println);
        Thread.sleep(3_000);
    }
}

运行结果:

20191013230718233.png


OK。


CustomLock

package com.artisan.customLock;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
public class CustomLock implements ILock {
    // 默认false
    // true: 已经被线程抢到  false: 空闲
    private boolean lockFlag;
    // 用于存储被blocked的线程,方便查看及计算被blocked的线程数量
    Collection<Thread> blockedThreadCollection = new ArrayList<>();
    // 当前持有锁的线程
    Thread lockHolderThread ;
    /**
     * 构造函数中初始化该lockFlag
     */
    public CustomLock(){
        this.lockFlag = false;
    }
    /**
     * synchronized 修饰该方法
     * @throws InterruptedException
     */
    @Override
    public synchronized void lock() throws InterruptedException {
        // 如果其他线程已经获取到了锁,让该线程wait
        while(lockFlag){
            // 加入到blockedThreadCollection
            blockedThreadCollection.add(Thread.currentThread());
            // wait
            this.wait();
        }
        // 如果空闲,将该monitor置为true
        blockedThreadCollection.remove(Thread.currentThread());
        this.lockFlag = true;
        // 将当前线程置为lockHolderThread
        this.lockHolderThread = Thread.currentThread();
    }
    @Override
    public synchronized  void lock(long timeout) throws InterruptedException, TimeOutException {
        // 入参不合理,直接调用lock ,也可抛出异常
        if (timeout <= 0 ) lock();
        // 线程等待的剩余时间
        long  leftTime = timeout;
        // 计算结束时间
        long endTime = System.currentTimeMillis() + timeout;
        while(lockFlag){
            // 如果超时了,抛出异常
            if (leftTime <= 0){
                throw new TimeOutException(Thread.currentThread().getName() + " 超时...");
            }
            // 加入到blockedThreadCollection
            blockedThreadCollection.add(Thread.currentThread());
            // wait 指定的时间
            this.wait(timeout);
            // 计算是否超时
            leftTime = endTime - System.currentTimeMillis();
        }
        // 如果空闲,将该monitor置为true
        blockedThreadCollection.remove(Thread.currentThread());
        this.lockFlag = true;
        // 将当前线程置为lockHolderThread
        this.lockHolderThread = Thread.currentThread();
    }
    @Override
    public synchronized void unlock() {
        // 如果是加锁的线程
        if(lockHolderThread == Thread.currentThread()){
            // 将Monitor置为空闲
            this.lockFlag = false;
            Optional.of(Thread.currentThread().getName() + " 释放lock" +  LocalTime.now().withNano(0)).ifPresent(System.out::println);
            // 唤醒其他正在等待的线程
            this.notifyAll();
        }
    }
    @Override
    public Collection<Thread> getBlockedThreads() {
        // blockedThreadCollection 可能被其他线程add 或者remove,这里定义为不可变的集合类型
        return Collections.unmodifiableCollection(blockedThreadCollection);
    }
    @Override
    public int getBlockedSize() {
        return blockedThreadCollection.size();
    }
}



相关文章
|
6月前
|
存储 Java
高并发编程之多线程锁和Callable&Future 接口
高并发编程之多线程锁和Callable&Future 接口
78 1
|
6月前
|
缓存 监控 Java
高并发编程之ThreadPool 线程池
高并发编程之ThreadPool 线程池
71 1
|
1月前
|
并行计算 算法 搜索推荐
探索Go语言的高并发编程与性能优化
【10月更文挑战第10天】探索Go语言的高并发编程与性能优化
|
2月前
|
网络协议 Java Linux
高并发编程必备知识IO多路复用技术select,poll讲解
高并发编程必备知识IO多路复用技术select,poll讲解
|
1月前
|
Java Linux 应用服务中间件
【编程进阶知识】高并发场景下Bio与Nio的比较及原理示意图
本文介绍了在Linux系统上使用Tomcat部署Java应用程序时,BIO(阻塞I/O)和NIO(非阻塞I/O)在网络编程中的实现和性能差异。BIO采用传统的线程模型,每个连接请求都会创建一个新线程进行处理,导致在高并发场景下存在严重的性能瓶颈,如阻塞等待和线程创建开销大等问题。而NIO则通过事件驱动机制,利用事件注册、事件轮询器和事件通知,实现了更高效的连接管理和数据传输,避免了阻塞和多级数据复制,显著提升了系统的并发处理能力。
58 0
|
4月前
|
SQL 关系型数据库 MySQL
(八)MySQL锁机制:高并发场景下该如何保证数据读写的安全性?
锁!这个词汇在编程中出现的次数尤为频繁,几乎主流的编程语言都会具备完善的锁机制,在数据库中也并不例外,为什么呢?这里牵扯到一个关键词:高并发,由于现在的计算机领域几乎都是多核机器,因此再编写单线程的应用自然无法将机器性能发挥到最大,想要让程序的并发性越高,多线程技术自然就呼之欲出,多线程技术一方面能充分压榨CPU资源,另一方面也能提升程序的并发支持性。
388 3
|
6月前
|
存储 关系型数据库 MySQL
《MySQL 入门教程》第 05 篇 账户和权限,Java高并发编程详解深入理解pdf
《MySQL 入门教程》第 05 篇 账户和权限,Java高并发编程详解深入理解pdf
|
6月前
|
监控 安全 网络安全
用Haskell语言实现高并发局域网聊天监控功能
使用Haskell构建的局域网聊天监控系统示例,通过`Network.Socket`等库监听并处理UDP消息,实现聊天记录的捕获。代码展示了如何创建UDP套接字接收消息,并打印出来。此外,利用HTTP客户端库发送HTTP请求,可将监控数据自动提交至网站,实现数据的实时管理和安全合规。
162 3
|
6月前
|
Java
高并发编程之JUC 三大辅助类和读写锁
高并发编程之JUC 三大辅助类和读写锁
51 1
|
6月前
|
Java
高并发编程之什么是 Lock 接口
高并发编程之什么是 Lock 接口
68 1

热门文章

最新文章

  • 1
    高并发场景下,到底先更新缓存还是先更新数据库?
    67
  • 2
    Java面试题:解释Java NIO与BIO的区别,以及NIO的优势和应用场景。如何在高并发应用中实现NIO?
    75
  • 3
    Java面试题:设计一个线程安全的单例模式,并解释其内存占用和垃圾回收机制;使用生产者消费者模式实现一个并发安全的队列;设计一个支持高并发的分布式锁
    68
  • 4
    Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
    62
  • 5
    Java面试题:结合建造者模式与内存优化,设计一个可扩展的高性能对象创建框架?利用多线程工具类与并发框架,实现一个高并发的分布式任务调度系统?设计一个高性能的实时事件通知系统
    55
  • 6
    Java面试题:假设你正在开发一个Java后端服务,该服务需要处理高并发的用户请求,并且对内存使用效率有严格的要求,在多线程环境下,如何确保共享资源的线程安全?
    69
  • 7
    在Java中实现高并发的数据访问控制
    42
  • 8
    使用Java构建一个高并发的网络服务
    29
  • 9
    微服务06----Eureka注册中心,微服务的两大服务,订单服务和用户服务,订单服务需要远程调用我们的用,户服务,消费者,如果环境改变,硬编码问题就会随之产生,为了应对高并发,我们可能会部署成一个集
    37
  • 10
    如何设计一个秒杀系统,(高并发高可用分布式集群)
    129