多线程并发工具类

简介: 参考资料《Java并发编程的艺术》《Java编程思想》

1.等待多线程完成的CountDownLatch

    CountDownLatch允许一个或多个线程等待其他线程完成操作。即他可以实现与join()方法相同的功能,而且比join的功能更多。可以在初始化CountDownLatch时传入一个int参数来设置初始计数值,任何在CountDownLatch对象上调用wait()的方法都将被阻塞,直到这个CountDownLatch对象的计数值为0。CountDownLatch被设计为只能触发一次,计数值不能被重置。

    当我们调用CountDownLatch的countDown方法时,计数值N就会减1,CountDownLatch的await方法 会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个 点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,只需要把这个 CountDownLatch的引用传递到线程里即可。

    注意:计数器必须大于等于0,只是等于0时候,计数器就是零,则此时调用await方法时不会阻塞当前线程。CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数器的值。一个线程调用countDown方法发生之前,另外一个线程调用await方法。一个线程调用countDown方法并不会被阻塞,只有调用await()方法的线程才会被阻塞。

public class TestCountDownLatch {
	static CountDownLatch c=new CountDownLatch(8);
	public static void main(String[] args) throws InterruptedException {
		for(int i=1;i<=8;i++){
			Thread t=new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						Thread.sleep(1000);
						System.out.println(Thread.currentThread().getName()+"完成");
						c.countDown();//计数器减1
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			});
			t.start();
		}
		c.await();//主线程会在此处阻塞,直到CountDownLatch的计数器为0才会恢复
		System.out.println("完成所有准备任务");
		System.out.println("主程序开始执行");
	}
}

2.同步屏障CyclicBarrier

    CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

    CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

public class TestCyclicBarrier {
	static CyclicBarrier c=new CyclicBarrier(2);
	public static void main(String[] args) {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)即可解除阻塞
				} catch (Exception e) {
				}
				System.out.println(1);
			}
		}).start();
		try {
			c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)即可解除阻塞
		} catch (Exception e) {
			
		}
		System.out.println(2);
	}

}

    注意:如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),则主线程和子线程会永远等待, 因为没有第三个线程执行await方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。

    CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,保证会优先执行barrierAction,方便处理更复杂的业务场景。

public class TestCyclicBarrier1 {
	static CyclicBarrier c = new CyclicBarrier(2, new A());
	public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
		new Thread(new Runnable() {
			@Override
			public void run() {
				// TODO Auto-generated method stub
				try {
					c.await();
					System.out.println(2);
				} catch (Exception e) {
					// TODO: handle exception
				}
			}
		}).start();
		c.await();
		System.out.println(1);
	}
	static class A implements Runnable {
		@Override
		public void run() {
			System.out.println(3);
		}
	}
}
/*
 * 输出结果:
 * 3
 * 2
 * 1
 */

    因为CyclicBarrier设置了拦截线程的数量是2,所以必须等代码中的第一个线程和线程A 都执行完之后,才会继续执行主线程,所以输出结果为3 2 1。那么此时有一个问题,如果阻塞的线程数大于CyclicBarrier的计数器会怎样?

public class TestCyclicBarrier {
	static CyclicBarrier c=new CyclicBarrier(2);
	public static void main(String[] args) {
		for(int i=1;i<=3;i++){
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)
				} catch (Exception e) {
				}
				System.out.println(Thread.currentThread().getName());
			}
		}).start();
		}
		try {
			c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)
		} catch (Exception e) {
			
		}
		System.out.println(2);
	}

}

根据结果可以知道,CyclicBarrier可以自动重置计数器数量,当拦截线程数量为2时会把从阻塞队列中任意取出两个解除阻塞并执行,如果还有剩余的阻塞队列则会重置计数器,如果剩余阻塞队列数量小于计数器则会阻塞运行,也就是说,如果有阻塞队列数X与计数器N,X%N==0,那么所有线程都会执行,如果X%N!=0,那么会有部分线程处于阻塞状态无法执行。也可以手动调用 reset()方法来进行重置计数器。

    CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier 阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。

public class TestCyclicBarrier {
	static CyclicBarrier c=new CyclicBarrier(2);
	public static void main(String[] args) {
		Thread t=new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)
				} catch (Exception e) {
				}
				System.out.println(Thread.currentThread().getName());
			}
		});
		t.start();
		t.interrupt();
		try {
			c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)
			System.out.println(2);
		} catch (BrokenBarrierException|InterruptedException e) {
			System.out.println(c.isBroken());
		}
		
	}

}
/*
true
*/

3.控制并发线程数的Semaphore

    Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。换句话说,锁(Lock锁或synchronized锁)在任何时刻只允许一个任务访问被加锁的资源,而计数信号量允许n个任务同时访问这个资源,还可以将信号量看作是向外分发使用资源的“许可证”,尽管内部没有这种许可证对象。

    “许可证”的数量是有限的,所以当执有“许可证”的线程数量与“许可证”数量相同时,就会阻止其他线程对共享资源的使用,如果某一个或多个线程使用完共享资源后,就会归还“许可证”,此时Semaphore(信号量)就会将这些归还的“许可证”再次分发给阻塞中的线程。通过这种方式就实现了控制线程并发数。

    3.1 API

  • Semaphore(int permits):构造器,接受一个整型的数字,表示可用的许可证数量。
  • acquire():线程调用该方法获取一个许可证来获取使用共享资源的资格。
  • release():线程使用完共享资源之后调用方法归还许可证。
  • tryAcquire():线程调用该方法尝试获取许可证。
  • int availablePermits():返回此信号量中当前可用的许可证数。
  • int getQueueLength():返回正在等待获取许可证的线程数。
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermits(int reduction):减少reduction个许可证,是个protected方法。
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方 法。

    3.2 应用

    Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程 并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这 时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连 接。这个时候,就可以使用Semaphore来做流量控制。

public class TestSemaphore {
	static Semaphore s=new Semaphore(10);
	static int threadNum=30;
	public static void main(String[] args) {
		for(int i=1;i<=30;i++){
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						s.acquire();
						System.out.println(Thread.currentThread().getName()+"do some work");
						s.release();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					
				}
			}).start();
		}
	}
}

4. 线程间交换数据的Exchanger

    Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也 执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。所以由此可见,Exchanger将会与 生产者-消费者模型相关。

    其应用场景有:Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换 两人的数据,并使用交叉规则得出2个交配结果。Exchanger也可以用于校对工作,比如我们需 要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行 录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致。如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发 生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长。

public class TestExchanger {
	static Exchanger<String> ex=new Exchanger<String>();
	static ExecutorService service=Executors.newFixedThreadPool(2);
	public static void main(String[] args) {
		service.execute(new Runnable() {
			@Override
			public void run() {
				String a="A";
				try {
					ex.exchange(a);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		});
		service.execute(new Runnable() {
			@Override
			public void run() {
				String b="B";
				try {
					String a=ex.exchange(b);
					System.out.println("a录入的是"+a);
					System.out.println("b录入的是"+b);
					System.out.println("a与b是否一致:"+a.equalsIgnoreCase(b));
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		});
		service.shutdown();
	}
}

 

相关文章
|
1月前
|
并行计算 Java 数据处理
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
163 0
|
26天前
|
安全
List并发线程安全问题
【10月更文挑战第21天】`List` 并发线程安全问题是多线程编程中一个非常重要的问题,需要我们认真对待和处理。只有通过不断地学习和实践,我们才能更好地掌握多线程编程的技巧和方法,提高程序的性能和稳定性。
131 59
|
4天前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
|
17天前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
1月前
|
Java
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
29 1
|
2月前
|
网络协议 C语言
C语言 网络编程(十四)并发的TCP服务端-以线程完成功能
这段代码实现了一个基于TCP协议的多线程服务器和客户端程序,服务器端通过为每个客户端创建独立的线程来处理并发请求,解决了粘包问题并支持不定长数据传输。服务器监听在IP地址`172.17.140.183`的`8080`端口上,接收客户端发来的数据,并将接收到的消息添加“-回传”后返回给客户端。客户端则可以循环输入并发送数据,同时接收服务器回传的信息。当输入“exit”时,客户端会结束与服务器的通信并关闭连接。
|
2月前
|
数据采集 消息中间件 并行计算
进程、线程与协程:并发执行的三种重要概念与应用
进程、线程与协程:并发执行的三种重要概念与应用
60 0
|
2月前
|
C语言
C语言 网络编程(九)并发的UDP服务端 以线程完成功能
这是一个基于UDP协议的客户端和服务端程序,其中服务端采用多线程并发处理客户端请求。客户端通过UDP向服务端发送登录请求,并根据登录结果与服务端的新子线程进行后续交互。服务端在主线程中接收客户端请求并创建新线程处理登录验证及后续通信,子线程创建新的套接字并与客户端进行数据交换。该程序展示了如何利用线程和UDP实现简单的并发服务器架构。
|
3月前
|
Rust 并行计算 安全
揭秘Rust并发奇技!线程与消息传递背后的秘密,让程序性能飙升的终极奥义!
【8月更文挑战第31天】Rust 以其安全性和高性能著称,其并发模型在现代软件开发中至关重要。通过 `std::thread` 模块,Rust 支持高效的线程管理和数据共享,同时确保内存和线程安全。本文探讨 Rust 的线程与消息传递机制,并通过示例代码展示其应用。例如,使用 `Mutex` 实现线程同步,通过通道(channel)实现线程间安全通信。Rust 的并发模型结合了线程和消息传递的优势,确保了高效且安全的并行执行,适用于高性能和高并发场景。
61 0
|
3月前
|
开发框架 Android开发 iOS开发
跨平台开发的双重奏:Xamarin在不同规模项目中的实战表现与成功故事解析
【8月更文挑战第31天】在移动应用开发领域,选择合适的开发框架至关重要。Xamarin作为一款基于.NET的跨平台解决方案,凭借其独特的代码共享和快速迭代能力,赢得了广泛青睐。本文通过两个案例对比展示Xamarin的优势:一是初创公司利用Xamarin.Forms快速开发出适用于Android和iOS的应用;二是大型企业借助Xamarin实现高性能的原生应用体验及稳定的后端支持。无论是资源有限的小型企业还是需求复杂的大公司,Xamarin均能提供高效灵活的解决方案,彰显其在跨平台开发领域的强大实力。
46 0