简介
线程的定制化通信:顾名思义多线程的循环交替执行。 有一个很经典的案例如下:
三个线程之间按顺序调用,实现 A -> B -> C 三个线程启动,要求如下:
AA打印5次,BB打印10次,CC打印15次
接着
AA 打印5次,BB打印10次,CC打印15次
……
循环10轮
大致通过JDK的支持梳理了下面的几种方案
- 通过线程不加锁定义状态变量(sleep)
- 通过synchronized关键字, 结合的wait与notifyAll
- Lock接口中 通过ReentrantLock的lock以及unlock
- Lock接口中 ReentrantLock结合Condition
- Lock接口中 Semaphore信号量方式
方法 / 步骤
一:通过线程不加锁定义状态变量(sleep)
通过线程配合sleep不加锁定义状态变量(非线程安全,可能会少打印)
启动三个线程,按照如下要求:
AA打印5此,BB打印10次,CC打印15次,一共进行10轮
public class ThreadOfSleep {
private static String message = "AA";
private static Integer loop = 1;
public void TdSleep() {
new Thread(() -> {
int count = 1;
while (count <= 5) {
while (!message.equals("AA")) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(message);
count = count + 1;
}
//while end
message = "BB";
}, "AA").start();
new Thread(() -> {
int count = 1;
while (count <= 10) {
while (!message.equals("BB")) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//while end
System.out.println(message);
count = count + 1;
}
message = "CC";
}, "BB").start();
new Thread(() -> {
int count = 1;
while (count <= 15) {
while (!message.equals("CC")) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//while end
System.out.println(message);
count = count + 1;
}
message = "AA";
// loop+=1;
// System.out.println("current loop: "+ loop);
}, "CC").start();
}
static class ThreadOfSleepTest {
public static void main(String[] args) {
ThreadOfSleep threadOfSleep = new ThreadOfSleep();
for (int i = 1; i <= 10; i++) {
threadOfSleep.TdSleep();
}
}
}
二:通过synchronized关键字, 结合的wait与notifyAll
通过synchronized锁定 对象标志位的 wait与 notifyAll 定制线程间的通信
启动三个线程,按照如下要求:AA打印5次,BB打印10次,CC打印15次,一共进行10轮
public class ThreadOfSynchronized {
private static String message = "AA";
private static Integer loop = 1;
private static Object lock = new Object();
public void ThreadOfSync() {
new Thread(() -> {
int count = 1;
synchronized (lock) {
while (count <= 5) {
while (!message.equals("AA")) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(message);
count = count + 1;
}
//while end
message = "BB";
lock.notifyAll();
}
//sync end
}, "AA").start();
new Thread(() -> {
int count = 1;
synchronized (lock) {
while (count <= 10) {
while (!message.equals("BB")) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//while end
System.out.println(message);
count = count + 1;
}
message = "CC";
lock.notifyAll();
}
//sync end
}, "BB").start();
new Thread(() -> {
int count = 1;
synchronized (lock) {
while (count <= 15) {
while (!message.equals("CC")) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//while end
System.out.println(message);
count = count + 1;
}
message = "AA";
loop+=1;
System.out.println("current loop: "+ loop);
lock.notifyAll();
}
//sync end
}, "CC").start();
}
public static class ThreadOfSynchronizedTest {
public static void main(String[] args) {
ThreadOfSynchronized threadOfSynchronized = new ThreadOfSynchronized();
for (int i = 1; i <= 10; i++) {
threadOfSynchronized.ThreadOfSync();
}
}
}
}
三:通过ReentrantLock的lock以及unlock
通过ReentrantLock的lock以及unlock 进行定制线程
交替打印AA BB CC 十次 通过前面的线程 循环带出后面线程
public class ThreadOfRLock {
// 通过JDK5中的Lock锁来保证线程的访问的互斥
static Lock rLock = new ReentrantLock();
//通过state的值来确定是否打印
private static int state = 1;
static class ThreadA extends Thread {
@Override
public void run() {
for (int i = 1; i <= 10;) {
try {
rLock.lock();
// 多线程并发,不能用if,必须用循环测试等待条件,避免虚假唤醒
while (state % 3 == 1) {
System.out.println(Thread.currentThread().getName()+"->> AA");
state++;
i++;
}
} finally {
rLock.unlock();
}
}
}
}
static class ThreadB extends Thread {
@Override
public void run() {
for (int i = 1; i <= 10;) {
try {
rLock.lock();
while (state % 3 == 2) {
System.out.println(Thread.currentThread().getName()+"->> BB");
state++;
i++;
}
} finally {
rLock.unlock();
}
}
}
}
static class ThreadC extends Thread {
@Override
public void run() {
for (int i = 1; i <= 10;) {
try {
rLock.lock();
while (state % 3 == 0) {
System.out.println(Thread.currentThread().getName()+"->> CC");
state++;
i++;
}
} finally {
rLock.unlock();
}
}
}
}
public static void main(String[] args) {
new ThreadA().start();
new ThreadB().start();
new ThreadC().start();
}
}
四:ReentrantLock结合Condition
启动三个线程,按照如下要求:
AA打印5此,BB打印10次,CC打印15次,一共进行10轮
每个线程添加一个标志位,是该标志位则执行操作,并且修改为下一个标志位,通知下一个标志位的线程
创建一个可重入锁private Lock lock = new ReentrantLock();
分别创建三个开锁通知private Condition c1 = lock.newCondition();
通过标志位Flag 和Condition 的wait 当前线程, sign(钥匙) 唤醒下一个线程 进行交替打印
public class CustomThread {
//定义标志位
private int flag = 1;
//创建Lock锁
Lock rLock = new ReentrantLock();
//创建3个Condition 环境条件(Condition是绑定当前线程的)
Condition c1 = rLock.newCondition();
Condition c2 = rLock.newCondition();
Condition c3 = rLock.newCondition();
//打印5次,loop表示第几轮
public void print5Count(int loop) throws InterruptedException {
//上锁
rLock.lock();
try {
//打印5次的标志位为1 打印10次标志位为2 打印15次的标志位为3, 非本次条件为等待
while (flag != 1) {
c1.await();
}
//业务处理
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i + ": 轮数:" + loop);
}
//通知 修改标志位2
flag = 2;
//
c2.signal();
} finally {
//释放锁
rLock.unlock();
}
}
//打印10次,loop表示第几轮
public void print10Count(int loop) throws InterruptedException {
//上锁
rLock.lock();
try {
//打印5次的标志位为1 打印10次标志位为2 打印15次的标志位为3, 非本次条件为等待
while (flag != 2) {
c2.await();
}
//业务处理
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i + ": 轮数:" + loop);
}
//通知 修改标志位2
flag = 3;
c3.signal();
} finally {
//释放锁
rLock.unlock();
}
}
//打印5次,loop表示第几轮
public void print15Count(int loop) throws InterruptedException {
//上锁
rLock.lock();
try {
//打印5次的标志位为1 打印10次标志位为2 打印15次的标志位为3, 非本次条件为等待
while (flag != 3) {
c3.await();
}
//业务处理
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i + ": 轮数:" + loop);
}
//通知 标志位为1的
flag = 1;
c1.signal();
} finally {
//释放锁
rLock.unlock();
}
}
public static class CustomThreadTest {
public static void main(String[] args) {
CustomThread customThread = new CustomThread();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
customThread.print5Count(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "AA").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
customThread.print10Count(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "BB").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
customThread.print15Count(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "CC").start();
}
}
}
五:Semaphore信号量方式
定义的全局变量,使用static
通过static Semaphore A = new Semaphore(1);,代表信号量为1
具体其代码块为:
semaphore.acquire(); 获取信号量
semaphore.release();释放信号量
public class ThreadOfSemaphore {
// 以A开始的信号量,初始信号量数量为1
private static Semaphore A = new Semaphore(1);
// B、C信号量,A完成后开始,初始信号数量为0
private static Semaphore B = new Semaphore(0);
private static Semaphore C = new Semaphore(0);
static class ThreadA extends Thread {
@Override
public void run() {
try {
for (int i = 1; i <= 10; i++) {
A.acquire();// A获取信号执行,A信号量减1,当A为0时将无法继续获得该信号量
System.out.println("AA");
B.release();// B释放信号,B信号量加1(初始为0),此时可以获取B信号量
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class ThreadB extends Thread {
@Override
public void run() {
try {
for (int i = 1; i <= 10; i++) {
B.acquire();
System.out.println("BB");
C.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class ThreadC extends Thread {
@Override
public void run() {
try {
for (int i = 1; i <= 10; i++) {
C.acquire();
System.out.println("CC");
A.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new ThreadA().start();
new ThreadB().start();
new ThreadC().start();
}
}