JUC 常用 4 大并发工具类是哪几个?(面试必问)(2)

简介: JUC 常用 4 大并发工具类是哪几个?(面试必问)(2)

Semaphore:

Semaphore,俗称信号量,作用于控制同时访问某个特定资源的线程数量,用在流量控制

一说特定资源控制,那么第一时间就想到了数据库连接..

之前用等待超时模式写了一个数据库连接池,打算用这个Semaphone也写一个


/**
 * Creates a {@code Semaphore} with the given number of
 * permits and nonfair fairness setting.
 *
 * @param permits the initial number of permits available.
 *        This value may be negative, in which case releases
 *        must occur before any acquires will be granted.
 */
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

在源码中可以看到在构建Semaphore信号量的时候,需要传入许可证的数量,这个数量就是资源的最大允许的访问的线程数


接下里用信号量实现一个数据库连接池


连接对象

package org.dance.day2.util.pool;
import org.dance.tools.SleepTools;
import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
/**
 * 数据库连接
 * @author ZYGisComputer
 */
public class SqlConnection implements Connection {
    /**
     * 获取数据库连接
     * @return
     */
    public static final Connection fetchConnection(){
        return new SqlConnection();
    }
    @Override
    public void commit() throws SQLException {
        SleepTools.ms(70);
    }
    @Override
    public Statement createStatement() throws SQLException {
        SleepTools.ms(1);
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        return null;
    }
    @Override
    public CallableStatement prepareCall(String sql) throws SQLException {
        return null;
    }
    @Override
    public String nativeSQL(String sql) throws SQLException {
        return null;
    }
    @Override
    public void setAutoCommit(boolean autoCommit) throws SQLException {
    }
    @Override
    public boolean getAutoCommit() throws SQLException {
        return false;
    }
    @Override
    public void rollback() throws SQLException {
    }
    @Override
    public void close() throws SQLException {
    }
    @Override
    public boolean isClosed() throws SQLException {
        return false;
    }
    @Override
    public DatabaseMetaData getMetaData() throws SQLException {
        return null;
    }
    @Override
    public void setReadOnly(boolean readOnly) throws SQLException {
    }
    @Override
    public boolean isReadOnly() throws SQLException {
        return false;
    }
    @Override
    public void setCatalog(String catalog) throws SQLException {
    }
    @Override
    public String getCatalog() throws SQLException {
        return null;
    }
    @Override
    public void setTransactionIsolation(int level) throws SQLException {
    }
    @Override
    public int getTransactionIsolation() throws SQLException {
        return 0;
    }
    @Override
    public SQLWarning getWarnings() throws SQLException {
        return null;
    }
    @Override
    public void clearWarnings() throws SQLException {
    }
    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }
    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }
    @Override
    public Map<String, Class<?>> getTypeMap() throws SQLException {
        return null;
    }
    @Override
    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
    }
    @Override
    public void setHoldability(int holdability) throws SQLException {
    }
    @Override
    public int getHoldability() throws SQLException {
        return 0;
    }
    @Override
    public Savepoint setSavepoint() throws SQLException {
        return null;
    }
    @Override
    public Savepoint setSavepoint(String name) throws SQLException {
        return null;
    }
    @Override
    public void rollback(Savepoint savepoint) throws SQLException {
    }
    @Override
    public void releaseSavepoint(Savepoint savepoint) throws SQLException {
    }
    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }
    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
        return null;
    }
    @Override
    public Clob createClob() throws SQLException {
        return null;
    }
    @Override
    public Blob createBlob() throws SQLException {
        return null;
    }
    @Override
    public NClob createNClob() throws SQLException {
        return null;
    }
    @Override
    public SQLXML createSQLXML() throws SQLException {
        return null;
    }
    @Override
    public boolean isValid(int timeout) throws SQLException {
        return false;
    }
    @Override
    public void setClientInfo(String name, String value) throws SQLClientInfoException {
    }
    @Override
    public void setClientInfo(Properties properties) throws SQLClientInfoException {
    }
    @Override
    public String getClientInfo(String name) throws SQLException {
        return null;
    }
    @Override
    public Properties getClientInfo() throws SQLException {
        return null;
    }
    @Override
    public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
        return null;
    }
    @Override
    public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
        return null;
    }
    @Override
    public void setSchema(String schema) throws SQLException {
    }
    @Override
    public String getSchema() throws SQLException {
        return null;
    }
    @Override
    public void abort(Executor executor) throws SQLException {
    }
    @Override
    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
    }
    @Override
    public int getNetworkTimeout() throws SQLException {
        return 0;
    }
    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return null;
    }
    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return false;
    }
}


连接池对象

package org.dance.day2.util.pool;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.Semaphore;
/**
 * 使用信号量控制数据库的链接和释放
 *
 * @author ZYGisComputer
 */
public class DBPoolSemaphore {
    /**
     * 池容量
     */
    private final static int POOL_SIZE = 10;
    /**
     * useful 代表可用连接
     * useless 代表已用连接
     *  为什么要使用两个Semaphore呢?是因为,在连接池中不只有连接本身是资源,空位也是资源,也需要记录
     */
    private final Semaphore useful, useless;
    /**
     * 连接池
     */
    private final static LinkedList<Connection> POOL = new LinkedList<>();
    /**
     * 使用静态块初始化池
     */
    static {
        for (int i = 0; i < POOL_SIZE; i++) {
            POOL.addLast(SqlConnection.fetchConnection());
        }
    }
    public DBPoolSemaphore() {
        // 初始可用的许可证等于池容量
        useful = new Semaphore(POOL_SIZE);
        // 初始不可用的许可证容量为0
        useless = new Semaphore(0);
    }
    /**
     * 获取数据库连接
     *
     * @return 连接对象
     */
    public Connection takeConnection() throws InterruptedException {
        // 可用许可证减一
        useful.acquire();
        Connection connection;
        synchronized (POOL) {
            connection = POOL.removeFirst();
        }
        // 不可用许可证数量加一
        useless.release();
        return connection;
    }
    /**
     * 释放链接
     *
     * @param connection 连接对象
     */
    public void returnConnection(Connection connection) throws InterruptedException {
        if(null!=connection){
            // 打印日志
            System.out.println("当前有"+useful.getQueueLength()+"个线程等待获取连接,,"
                    +"可用连接有"+useful.availablePermits()+"个");
            // 不可用许可证减一
            useless.acquire();
            synchronized (POOL){
                POOL.addLast(connection);
            }
            // 可用许可证加一
            useful.release();
        }
    }
}

测试类:

package org.dance.day2.util.pool;
import org.dance.tools.SleepTools;
import java.sql.Connection;
import java.util.Random;
/**
 * 测试Semaphore
 * @author ZYGisComputer
 */
public class UseSemaphore {
    /**
     * 连接池
     */
    public static final DBPoolSemaphore pool = new DBPoolSemaphore();
    private static class BusiThread extends Thread{
        @Override
        public void run() {
            // 随机数工具类 为了让每个线程持有连接的时间不一样
            Random random = new Random();
            long start = System.currentTimeMillis();
            try {
                Connection connection = pool.takeConnection();
                System.out.println("Thread_"+Thread.currentThread().getId()+
                        "_获取数据库连接耗时["+(System.currentTimeMillis()-start)+"]ms.");
                // 模拟使用连接查询数据
                SleepTools.ms(100+random.nextInt(100));
                System.out.println("查询数据完成归还连接");
                pool.returnConnection(connection);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            BusiThread busiThread = new BusiThread();
            busiThread.start();
        }
    }
}

测试返回结果:

Thread_11_获取数据库连接耗时[0]ms.
Thread_12_获取数据库连接耗时[0]ms.
Thread_13_获取数据库连接耗时[0]ms.
Thread_14_获取数据库连接耗时[0]ms.
Thread_15_获取数据库连接耗时[0]ms.
Thread_16_获取数据库连接耗时[0]ms.
Thread_17_获取数据库连接耗时[0]ms.
Thread_18_获取数据库连接耗时[0]ms.
Thread_19_获取数据库连接耗时[0]ms.
Thread_20_获取数据库连接耗时[0]ms.
查询数据完成归还连接
当前有40个线程等待获取连接,,可用连接有0个
Thread_21_获取数据库连接耗时[112]ms.
查询数据完成归还连接
...................查询数据完成归还连接
当前有2个线程等待获取连接,,可用连接有0个
Thread_59_获取数据库连接耗时[637]ms.
查询数据完成归还连接
当前有1个线程等待获取连接,,可用连接有0个
Thread_60_获取数据库连接耗时[660]ms.
查询数据完成归还连接
当前有0个线程等待获取连接,,可用连接有0个
查询数据完成归还连接...................
当前有0个线程等待获取连接,,可用连接有8个
查询数据完成归还连接
当前有0个线程等待获取连接,,可用连接有9个


通过执行结果可以很明确的看到,一上来就有10个线程获取到了连接,,然后后面的40个线程进入阻塞,然后只有释放链接之后,等待的线程就会有一个拿到,然后越后面的线程等待的时间就越长,然后一直到所有的线程执行完毕


最后打印的可用连接有九个不是因为少了一个是因为在释放之前打印的,不是错误


从结果中可以看到,我们对连接池中的资源的到了控制,这就是信号量的流量控制




Exchanger:

Exchanger,俗称交换器,用于在线程之间交换数据,但是比较受限,因为只能两个线程之间交换数据

/**
 * Creates a new Exchanger.
 */
public Exchanger() {
    participant = new Participant();
}


这个构造函数没有什么好说的,也没有入参,只有在创建的时候指定一下需要交换的数据的泛型即可,下面看代码

package org.dance.day2.util;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Exchanger;
/**
 * 线程之间交换数据
 * @author ZYGisComputer
 */
public class UseExchange {
    private static final Exchanger<Set<String>> exchanger = new Exchanger<>();
    public static void main(String[] args) {
        new Thread(){
            @Override
            public void run() {
                Set<String> aSet = new HashSet<>();
                aSet.add("A");
                aSet.add("B");
                aSet.add("C");
                try {
                    Set<String> exchange = exchanger.exchange(aSet);
                    for (String s : exchange) {
                        System.out.println("aSet"+s);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();
        new Thread(){
            @Override
            public void run() {
                Set<String> bSet = new HashSet<>();
                bSet.add("1");
                bSet.add("2");
                bSet.add("3");
                try {
                    Set<String> exchange = exchanger.exchange(bSet);
                    for (String s : exchange) {
                        System.out.println("bSet"+s);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();
    }
}

执行结果:

bSetAbSetBbSetCaSet1aSet2aSet3

通过执行结果可以清晰的看到,两个线程中的数据发生了交换,这就是Exchanger的线程数据交换了

以上就是JUC的4大常用并发工具类了


相关文章
|
3月前
|
存储 缓存 并行计算
【面试问题】JDK并发类库提供的线程池实现有哪些?
【1月更文挑战第27天】【面试问题】JDK并发类库提供的线程池实现有哪些?
|
3月前
|
存储 前端开发 JavaScript
【面试题】面试官问:如果有100个请求,你如何使用Promise控制并发?
【面试题】面试官问:如果有100个请求,你如何使用Promise控制并发?
|
3月前
|
存储 前端开发 JavaScript
面试官问:如果有100个请求,你如何使用Promise控制并发?
面试官问:如果有100个请求,你如何使用Promise控制并发?
|
2月前
|
存储 前端开发 JavaScript
前端面试:如何实现并发请求数量控制?
前端面试:如何实现并发请求数量控制?
84 0
|
3月前
|
存储 缓存 NoSQL
《吊打面试官》系列-Redis双写一致性、并发竞争、线程模型
《吊打面试官》系列-Redis双写一致性、并发竞争、线程模型
40 0
|
3月前
|
安全 Linux Go
golang面试:golang并发与多线程(三)
golang面试:golang并发与多线程(三)
45 0
|
4月前
|
Java 数据库 索引
最强阿里及大厂350道面试大全:框架+数据库+并发+开源+微服务
无论是对于刚入行工作还是已经工作几年的java开发者来说,面试求职始终是你需要直面的一件事情。首先梳理自己的知识体系,针对性准备,会有事半功倍的效果。我们往往会把重点放在技术上,而忽略了人事部分,实际上人事面试也会影响到最终的结果,把每一个环节做好,最终的结果自然不会差。
|
4月前
|
消息中间件 算法 NoSQL
45k以上突击面试必备,redis+mysql+并发+spring+算法+导图等
今天小编给大家带来的一篇关于Java面试相关的电子文档资源,介绍了关于Java、面试题方面的内容,本书是由Java官网出版,格式为DOC,资源大小62.5 MB,目前豆瓣、亚马逊、当当、京东等电子书综合评分为:8.7。
|
5月前
|
存储 安全 Java
Java并发篇:6个必备的Java并发面试种子题目
文章涉及了几个常见的并发编程相关的主题。首先,线程的创建和生命周期是面试中常被问及的话题,面试官可能会询问如何创建线程、线程的状态转换以及如何控制线程的执行顺序等。其次,synchronized关键字是用于实现线程同步的重要工具,面试中可能会涉及到它的使用场景以及与其他同步机制的比较。此外,抽象队列同步器(AQS)是Java并发编程中的核心概念,了解其原理和应用场景可以展示对并发编程的深入理解。最后,面试中可能会考察对Java线程池和Fork/Join框架的了解,包括它们的使用方法、优势和适用场景等。种子题目务必学会
|
5月前
|
Java
面试java并发~(lock、volatile、cas)
面试java并发~(lock、volatile、cas)
42 0