Java并发基础-线程间通信

简介: Java并发基础-线程间通信

线程间通信

volatile和synchronized关键字

volatile修饰的变量,程序访问时都需要在共享内存中去读取,对它的改变也必须更新共享内存,保证了线程对变量访问的可见性。

synchronized:对于 同步块 的实现使用了monitorenter和monitorexit指令,而 同步方法 则是依靠方法修饰符上的ACC_SYNCHRONIZED来完成的。无论采用哪种方式,其本质是对一个对象的监视器monitor进行获取,而这个获取过程是排他的,也就是同一时刻只能有一个线程获取到由synchronized所保护对象的监视器。

等待/通知机制——wait和notify

指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。 等待:wait()、wait(long)、wait(long, int) 通知:notify()、notifyAll() 示例:

    private static Object object = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                synchronized (object) {
                    System.out.println("t1 start object.wait(), time = " + System.currentTimeMillis() / 1000);
                    object.wait();
                    System.out.println("t1 after object.wait(), time = " + System.currentTimeMillis() / 1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Thread t2 = new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
                synchronized (object) {
                    System.out.println("t2 start object.notify(), time = " + System.currentTimeMillis() / 1000);
                    object.notify();
                    System.out.println("t2 after object.notify(), time = " + System.currentTimeMillis() / 1000);
                }

                synchronized (object) {
                    System.out.println("t2  hold lock again, time = " + System.currentTimeMillis() / 1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        t1.start();
        t2.start();
    }
复制代码

输出的结果

t1 start object.wait(), time = 1560138112
t2 start object.notify(), time = 1560138116
t2 after object.notify(), time = 1560138116
t2  hold lock again, time = 1560138116
t1 after object.wait(), time = 1560138116
复制代码

以下结论

  1. wait()、notify()和notifyAll()时需要先对调用对象加锁,否则会报java.lang.IllegalMonitorStateException异常。
  2. 调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。
  3. notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。
  4. notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED。
  5. 从wait()方法返回的前提是获得了调用对象的锁。

等待/通知的经典范式

包括 等待方(消费者)和 通知方(生产者)。 等待方遵循以下原则:

  • 获取对象的锁。
  • 如果条件不满足,那么调用对象的wait方法,被通知后任要检查条件。
  • 条件不满足则执行对应的逻辑。 对应代码如下:
synchronized (对象) {
    while (条件不满足) {
        对象.wait();
    }
    对应的处理逻辑
}
复制代码

通知方遵循以下原则:

  • 获取对象的锁。
  • 改变条件。
  • 通知所有在等待在对象上的线程。
synchronized (对象) {
    改变条件
    对象.notifyAll();
}
复制代码

管道输入/输出流

PipedWriter和PipedReader分别是字符管道输出流和字符管道输入流,是字符流中"管道流",可以实现同一个进程中两个线程之间的通信,与PipedOutputStream和PipedInputStream相比,功能类似.区别是前者传输的字符数据,而后者传输的是字节数据.

不同线程之间通信的流程大致是:PipedWriter与PipedReader流进行连接,写入线程通过将字符管道输出流写入数据,实际将数据写到了与PipedWriter连接的PipedReader流中的缓冲区,然后读取线程通过PipedReader流读取之前存储在缓冲区里面的字符数据.

例子

    private static PipedWriter writer;
    private static PipedReader reader;

    public static void main(String[] args) throws InterruptedException, IOException {
        writer = new PipedWriter();
        reader = new PipedReader();
        //绑定输入输出
        writer.connect(reader);
        Thread t = new Thread(() -> {
            int res;
            try {
                while ((res = reader.read()) != -1) {
                    System.out.print((char) res);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        t.start();

        int res;
        while ((res = System.in.read()) != -1) {
            System.out.println(res);
            writer.write(res);
            //按回车结束
            if (res == 10) {
                break;
            }
        }
        writer.close();
    }
复制代码

ThreadLocal

变量值的共享可以使用public static的形式,所有线程都使用同一个变量,如果想实现每一个线程都有自己的共享变量该如何实现呢?JDK中的ThreadLocal类正是为了解决这样的问题。

ThreadLocal类并不是用来解决多线程环境下的共享变量问题,而是用来提供线程内部的共享变量,在多线程环境下,可以保证各个线程之间的变量互相隔离、相互独立。在线程中,可以通过get()/set()方法来访问变量。ThreadLocal实例通常来说都是private static类型的,它们希望将状态与线程进行关联。这种变量在线程的生命周期内起作用,可以减少同一个线程内多个函数或者组件之间一些公共变量的传递的复杂度。

例子

public class ThreadLocalTest {
  static class MyThread extends Thread {
    private static ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
    
    @Override
    public void run() {
      super.run();
      for (int i = 0; i < 3; i++) {
        threadLocal.set(i);
        System.out.println(getName() + " threadLocal.get() = " + threadLocal.get());
      }
    }
  }
  
  public static void main(String[] args) {
    MyThread myThreadA = new MyThread();
    myThreadA.setName("ThreadA");
    
    MyThread myThreadB = new MyThread();
    myThreadB.setName("ThreadB");
    
    myThreadA.start();
    myThreadB.start();
  }
}
复制代码
ThreadA threadLocal.get() = 0
ThreadB threadLocal.get() = 0
ThreadA threadLocal.get() = 1
ThreadA threadLocal.get() = 2
ThreadB threadLocal.get() = 1
ThreadB threadLocal.get() = 2
复制代码

ThreadLocal最简单的实现方式就是ThreadLocal类内部有一个线程安全的Map,然后用线程的ID作为Map的key,实例对象作为Map的value,这样就能达到各个线程的值隔离的效果。

一个数据库连接池的简单实现

1.连接池创建

package com.atguigu.ct.producer.Test.BB;

import java.sql.Connection;
import java.util.LinkedList;

public class ConnectionPool {

    private LinkedList<Connection> pool = new LinkedList<>();

    public ConnectionPool(int initialSize){
        if(initialSize > 0){
            for (int i = 0; i < initialSize ; i++) {
                pool.addLast(ConnectionDriver.createConnection());
            }
        }
    }

    public void releaseConnection(Connection connection){
        if(connection != null){
            synchronized (pool){
                //连接释放后需要进行通知,这样其他消费者能够感知到连接池中已归还了一个连接
                pool.addLast(connection);
                pool.notifyAll();
            }
        }
    }

    //在 mills内无法获取到连接,将返回 null
    public Connection fetchConnection(long mills) throws InterruptedException {
        synchronized (pool){
            //完全超时
            if(mills <= 0){
                while (pool.isEmpty()){
                    pool.wait();
                }
                return pool.removeFirst();
            }else{
                long future = System.currentTimeMillis() + mills;
                long remaining = mills;
                while (pool.isEmpty() && remaining > 0){
                    pool.wait(remaining);
                    remaining = future - System.currentTimeMillis();
                }
                Connection result = null;
                if(!pool.isEmpty()){
                    result = pool.removeFirst();
                }
                return result;
            }
        }
    }
}

复制代码

2.由于java.sql.Connection 是一个接口,最终的实现是由数据库驱动提供方来实现的,博主只是为了娱乐,就用动态代理构造了一个Connection,改Connection的代理实现仅仅是在commit()方法调用时休眠100毫秒,如下

package com.atguigu.ct.producer.Test.BB;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.concurrent.TimeUnit;

public class ConnectionDriver {

    static class ConnectionHandler implements InvocationHandler {

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if(method.getName().equals("commit")){
                TimeUnit.MILLISECONDS.sleep(100);
            }
            return null;
        }
    }

    //创建一个 Connection 代理,在 commit 时休眠 100 毫秒
    public static final Connection createConnection(){
        return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),
                new Class<?>[]{Connection.class}, new ConnectionHandler());
    }
}

复制代码

3.模拟客户端ConnectionRunner获取、使用、最后释放连接的过程,当他使用时连接将会增加获取到连接的数量,反之,将会增加未获取到连接的数量,如下:

package com.atguigu.ct.producer.Test.BB;

import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class ConnectionPoolTest {

    static ConnectionPool pool = new ConnectionPool(10);
    //保证所有的ConnectionRunner 能够同时开始
    static CountDownLatch start = new CountDownLatch(1);
    //main 线程将会等待所有 ConnectionRunner 结束才能继续执行
    static CountDownLatch end;

    public static void main(String[] args) throws InterruptedException {
        //线程数量,可以修改线程数量进行观察
        int threadCount = 100;
        end = new CountDownLatch(threadCount);
        int count = 20;
        AtomicInteger got = new AtomicInteger();
        AtomicInteger notGot = new AtomicInteger();
        for (int i = 0; i < threadCount; i++) {
            Thread thread = new Thread(new ConnetionRunner(count, got, notGot), "ConnetionRunnerThread");
            thread.start();
        }
        start.countDown();
        end.await();
        System.out.println("total invoke:" + (threadCount * count));
        System.out.println(" got connection : " + got);
        System.out.println(" not got connection : " + notGot);
    }


    static class ConnetionRunner implements Runnable {

        int count;
        AtomicInteger got;
        AtomicInteger notGot;

        public ConnetionRunner(int count, AtomicInteger got, AtomicInteger notGot) {
            this.count = count;
            this.got = got;
            this.notGot = notGot;
        }

        @Override
        public void run() {
            try {
                start.await();
            } catch (InterruptedException e) {
            }
            while (count > 0) {
                try {
                    //从线程池中获取连接,如果1000ms内无法获取到,将会返回null
                    //分别统计连接获取的数量got和未获取到的数量 notGot
                    Connection connection = pool.fetchConnection(1000);
                    if (connection != null) {
                        try {
                            connection.createStatement();
                            connection.commit();
                        } finally {
                            pool.releaseConnection(connection);
                            got.incrementAndGet();
                        }
                    } else {
                        notGot.incrementAndGet();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    count--;
                }
            }
            end.countDown();
        }
    }
}

复制代码

下面通过使用前一节中的线程池来构造一个简单的Web服务器,这个Web服务器用来处理 HTTP请求,目前只能处理简单的文本和JPG图片内容。这个Web服务器使用main线程不断地接 受客户端Socket的连接,将连接以及请求提交给线程池处理,这样使得Web服务器能够同时处 理多个客户端请求,示例如下所示。

public class SimpleHttpServer {
 
    private int port=8080;
    private ServerSocketChannel serverSocketChannel = null;
    private ExecutorService executorService;
    private static final int POOL_MULTIPLE = 4;
 
    public SimpleHttpServer() throws IOException {
      executorService= Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE);
      serverSocketChannel= ServerSocketChannel.open();
      serverSocketChannel.socket().setReuseAddress(true);
      serverSocketChannel.socket().bind(new InetSocketAddress(port));
      System.out.println("ddd");
    }
 
    public void service() {
      while (true) {
        SocketChannel socketChannel=null;
        try {
          socketChannel = serverSocketChannel.accept();
          executorService.execute(new Handler(socketChannel));
        }catch (IOException e) {
           e.printStackTrace();
        }
      }
    }
 
    public static void main(String args[])throws IOException {
      new SimpleHttpServer().service();
    }
    class Handler implements Runnable{
    private SocketChannel socketChannel;
    public Handler(SocketChannel socketChannel){
      this.socketChannel=socketChannel;
    }
    public void run(){
      handle(socketChannel);
    }
 
    public void handle(SocketChannel socketChannel){
      try {
          Socket socket=socketChannel.socket();
          System.out.println("ddd" +
          socket.getInetAddress() + ":" +socket.getPort());
 
           ByteBuffer buffer=ByteBuffer.allocate(1024);
           socketChannel.read(buffer);
           buffer.flip();
           String request=decode(buffer);
           System.out.print(request);  
 
           StringBuffer sb=new StringBuffer("HTTP/1.1 200 OK\r\n");
           sb.append("Content-Type:text/html\r\n\r\n");
           socketChannel.write(encode(sb.toString()));
 
           FileInputStream in;
           
           String firstLineOfRequest=request.substring(0,request.indexOf("\r\n"));
           if(firstLineOfRequest.indexOf("login.htm")!=-1)
              in=new FileInputStream("/Users/tokou/Documents/post.html");
           else
              in=new FileInputStream("/Users/tokou/Documents/post.html");
 
           FileChannel fileChannel=in.getChannel();
           fileChannel.transferTo(0,fileChannel.size(),socketChannel);
           fileChannel.close();
        }catch (Exception e) {
           e.printStackTrace();
        }finally {
           try{
             if(socketChannel!=null)socketChannel.close();
           }catch (IOException e) {e.printStackTrace();}
        }
    }
    private Charset charset=Charset.forName("GBK");
    public String decode(ByteBuffer buffer){  
      CharBuffer charBuffer= charset.decode(buffer);
      return charBuffer.toString();
    }
    public ByteBuffer encode(String str){  
      return charset.encode(str);
    }
   }
}

复制代码

面试题

2个线程交替打印A1B2C3D4...这样的模式的实现

方法一 LockSupport

public class TestLockSupport {

    static Thread t1=null,t2=null;

    public static void main(String[] args) {

        char[] aI="1234567".toCharArray();
        char[] aC="ABCDEFG".toCharArray();

        t1=new Thread(()->{
            for (char c : aI) {
                System.out.println(c);
                LockSupport.unpark(t2);
                LockSupport.park();
            }
        },"t1");


        t2=new Thread(()->{
            for (char c : aC) {
                LockSupport.park();
                System.out.println(c);
                LockSupport.unpark(t1);
            }
        },"t1");
        t1.start();
        t2.start();
    }
}

复制代码

结果

1
A
2
B
3
C
4
D
5
E
6
F
7
G
复制代码

方法二 用CAS自旋锁+volatitle来实现

复制代码

 enum  ReadyToRun {T1,T2}

  //先定义T1准备运行  而且要设置volatile 线程可见
  static volatile ReadyToRun r=ReadyToRun.T1;

    public static void main(String[] args) {

        char[] aI="1234567".toCharArray();
        char[] aC="ABCDEFG".toCharArray();


        new Thread(()->{
            for (char c : aI) {
                //如果不是T1准备运行 就一直返回空,直到T1运行打印,打印完之后把准备运行的变为T2
                while (r!=ReadyToRun.T1){}
                System.out.println(c);
                r=ReadyToRun.T2;
            }

        },"t1").start();


        new Thread(()->{
            for (char c : aC) {
                //如果不是T2准备运行 就一直返回空,直到T2运行打印,打印完之后把准备运行的变为T1
                while (r!=ReadyToRun.T2){}
                System.out.println(c);
                r=ReadyToRun.T1;
            }

        },"t1").start();

    }
复制代码

结果

1
A
2
B
3
C
4
D
5
E
6
F
7
G

复制代码

方法三 原子类 AtomicInteger

public class TestLockSupport {

    
    //定义一个原子性的对象
    static AtomicInteger thredNo=new AtomicInteger(1);

    public static void main(String[] args) {


        char[] aI="1234567".toCharArray();
        char[] aC="ABCDEFG".toCharArray();


        new Thread(()->{
            for (char c : aI) {
                //如果不是1就一直返回空,直到运行打印,打印完之后把原子对象变成2
                while (thredNo.get()!=1){}
                System.out.println(c);
                thredNo.set(2);
            }

        },"t1").start();


        new Thread(()->{
            for (char c : aC) {
                //如果不是2就一直返回空,直到运行打印,打印完之后把原子对象变成1
                while (thredNo.get()!=2){}
                System.out.println(c);
                thredNo.set(1);
            }

        },"t1").start();

    }
}

复制代码

方法四 也是面试官 想考你的 synchronized wait notiyfy

    public static void main(String[] args) {
    final  Object o=new Object();

        char[] aI="1234567".toCharArray();
        char[] aC="ABCDEFG".toCharArray();


        new Thread(()->{
            synchronized (o){
                for (char c : aI) {
                    try {
                        System.out.println(c);
                        o.wait();
                        o.notify();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                o.notify();
            }


        },"t1").start();


        new Thread(()->{
            synchronized (o){
                for (char c : aC) {
                    System.out.println(c);
                    o.notify();
                    try {
                        o.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                o.notify();
            }


        },"t1").start();

    }
复制代码

结尾

多线程的基础就讲到这里了,大家看完这些应该能够知道,线程的基本概况,接下来我们看看并发的锁吧

目录
相关文章
|
8天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
10天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
10天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。
|
11天前
|
安全 Java 编译器
深入理解Java中synchronized三种使用方式:助您写出线程安全的代码
`synchronized` 是 Java 中的关键字,用于实现线程同步,确保多个线程互斥访问共享资源。它通过内置的监视器锁机制,防止多个线程同时执行被 `synchronized` 修饰的方法或代码块。`synchronized` 可以修饰非静态方法、静态方法和代码块,分别锁定实例对象、类对象或指定的对象。其底层原理基于 JVM 的指令和对象的监视器,JDK 1.6 后引入了偏向锁、轻量级锁等优化措施,提高了性能。
33 3
|
11天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
92 2
|
19天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
46 6
|
27天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
27天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
50 3
|
28天前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####
|
1月前
|
监控 Java 数据库连接
Java线程管理:守护线程与用户线程的区分与应用
在Java多线程编程中,线程可以分为守护线程(Daemon Thread)和用户线程(User Thread)。这两种线程在行为和用途上有着明显的区别,了解它们的差异对于编写高效、稳定的并发程序至关重要。
35 2