多线程并发工具类

简介: 参考资料《Java并发编程的艺术》《Java编程思想》

1.等待多线程完成的CountDownLatch

    CountDownLatch允许一个或多个线程等待其他线程完成操作。即他可以实现与join()方法相同的功能,而且比join的功能更多。可以在初始化CountDownLatch时传入一个int参数来设置初始计数值,任何在CountDownLatch对象上调用wait()的方法都将被阻塞,直到这个CountDownLatch对象的计数值为0。CountDownLatch被设计为只能触发一次,计数值不能被重置。

    当我们调用CountDownLatch的countDown方法时,计数值N就会减1,CountDownLatch的await方法 会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个 点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,只需要把这个 CountDownLatch的引用传递到线程里即可。

    注意:计数器必须大于等于0,只是等于0时候,计数器就是零,则此时调用await方法时不会阻塞当前线程。CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数器的值。一个线程调用countDown方法发生之前,另外一个线程调用await方法。一个线程调用countDown方法并不会被阻塞,只有调用await()方法的线程才会被阻塞。

public class TestCountDownLatch {
	static CountDownLatch c=new CountDownLatch(8);
	public static void main(String[] args) throws InterruptedException {
		for(int i=1;i<=8;i++){
			Thread t=new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						Thread.sleep(1000);
						System.out.println(Thread.currentThread().getName()+"完成");
						c.countDown();//计数器减1
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			});
			t.start();
		}
		c.await();//主线程会在此处阻塞,直到CountDownLatch的计数器为0才会恢复
		System.out.println("完成所有准备任务");
		System.out.println("主程序开始执行");
	}
}

2.同步屏障CyclicBarrier

    CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

    CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

public class TestCyclicBarrier {
	static CyclicBarrier c=new CyclicBarrier(2);
	public static void main(String[] args) {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)即可解除阻塞
				} catch (Exception e) {
				}
				System.out.println(1);
			}
		}).start();
		try {
			c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)即可解除阻塞
		} catch (Exception e) {
			
		}
		System.out.println(2);
	}

}

    注意:如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),则主线程和子线程会永远等待, 因为没有第三个线程执行await方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。

    CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,保证会优先执行barrierAction,方便处理更复杂的业务场景。

public class TestCyclicBarrier1 {
	static CyclicBarrier c = new CyclicBarrier(2, new A());
	public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
		new Thread(new Runnable() {
			@Override
			public void run() {
				// TODO Auto-generated method stub
				try {
					c.await();
					System.out.println(2);
				} catch (Exception e) {
					// TODO: handle exception
				}
			}
		}).start();
		c.await();
		System.out.println(1);
	}
	static class A implements Runnable {
		@Override
		public void run() {
			System.out.println(3);
		}
	}
}
/*
 * 输出结果:
 * 3
 * 2
 * 1
 */

    因为CyclicBarrier设置了拦截线程的数量是2,所以必须等代码中的第一个线程和线程A 都执行完之后,才会继续执行主线程,所以输出结果为3 2 1。那么此时有一个问题,如果阻塞的线程数大于CyclicBarrier的计数器会怎样?

public class TestCyclicBarrier {
	static CyclicBarrier c=new CyclicBarrier(2);
	public static void main(String[] args) {
		for(int i=1;i<=3;i++){
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)
				} catch (Exception e) {
				}
				System.out.println(Thread.currentThread().getName());
			}
		}).start();
		}
		try {
			c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)
		} catch (Exception e) {
			
		}
		System.out.println(2);
	}

}

根据结果可以知道,CyclicBarrier可以自动重置计数器数量,当拦截线程数量为2时会把从阻塞队列中任意取出两个解除阻塞并执行,如果还有剩余的阻塞队列则会重置计数器,如果剩余阻塞队列数量小于计数器则会阻塞运行,也就是说,如果有阻塞队列数X与计数器N,X%N==0,那么所有线程都会执行,如果X%N!=0,那么会有部分线程处于阻塞状态无法执行。也可以手动调用 reset()方法来进行重置计数器。

    CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier 阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。

public class TestCyclicBarrier {
	static CyclicBarrier c=new CyclicBarrier(2);
	public static void main(String[] args) {
		Thread t=new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)
				} catch (Exception e) {
				}
				System.out.println(Thread.currentThread().getName());
			}
		});
		t.start();
		t.interrupt();
		try {
			c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)
			System.out.println(2);
		} catch (BrokenBarrierException|InterruptedException e) {
			System.out.println(c.isBroken());
		}
		
	}

}
/*
true
*/

3.控制并发线程数的Semaphore

    Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。换句话说,锁(Lock锁或synchronized锁)在任何时刻只允许一个任务访问被加锁的资源,而计数信号量允许n个任务同时访问这个资源,还可以将信号量看作是向外分发使用资源的“许可证”,尽管内部没有这种许可证对象。

    “许可证”的数量是有限的,所以当执有“许可证”的线程数量与“许可证”数量相同时,就会阻止其他线程对共享资源的使用,如果某一个或多个线程使用完共享资源后,就会归还“许可证”,此时Semaphore(信号量)就会将这些归还的“许可证”再次分发给阻塞中的线程。通过这种方式就实现了控制线程并发数。

    3.1 API

  • Semaphore(int permits):构造器,接受一个整型的数字,表示可用的许可证数量。
  • acquire():线程调用该方法获取一个许可证来获取使用共享资源的资格。
  • release():线程使用完共享资源之后调用方法归还许可证。
  • tryAcquire():线程调用该方法尝试获取许可证。
  • int availablePermits():返回此信号量中当前可用的许可证数。
  • int getQueueLength():返回正在等待获取许可证的线程数。
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermits(int reduction):减少reduction个许可证,是个protected方法。
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方 法。

    3.2 应用

    Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程 并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这 时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连 接。这个时候,就可以使用Semaphore来做流量控制。

public class TestSemaphore {
	static Semaphore s=new Semaphore(10);
	static int threadNum=30;
	public static void main(String[] args) {
		for(int i=1;i<=30;i++){
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						s.acquire();
						System.out.println(Thread.currentThread().getName()+"do some work");
						s.release();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					
				}
			}).start();
		}
	}
}

4. 线程间交换数据的Exchanger

    Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也 执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。所以由此可见,Exchanger将会与 生产者-消费者模型相关。

    其应用场景有:Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换 两人的数据,并使用交叉规则得出2个交配结果。Exchanger也可以用于校对工作,比如我们需 要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行 录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致。如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发 生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长。

public class TestExchanger {
	static Exchanger<String> ex=new Exchanger<String>();
	static ExecutorService service=Executors.newFixedThreadPool(2);
	public static void main(String[] args) {
		service.execute(new Runnable() {
			@Override
			public void run() {
				String a="A";
				try {
					ex.exchange(a);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		});
		service.execute(new Runnable() {
			@Override
			public void run() {
				String b="B";
				try {
					String a=ex.exchange(b);
					System.out.println("a录入的是"+a);
					System.out.println("b录入的是"+b);
					System.out.println("a与b是否一致:"+a.equalsIgnoreCase(b));
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		});
		service.shutdown();
	}
}

 

相关文章
|
1月前
|
安全 数据库连接 数据库
连接池的并发和线程安全
连接池的并发和线程安全
|
6天前
|
安全 Java
深入理解 Java 多线程和并发工具类
【4月更文挑战第19天】本文探讨了Java多线程和并发工具类在实现高性能应用程序中的关键作用。通过继承`Thread`或实现`Runnable`创建线程,利用`Executors`管理线程池,以及使用`Semaphore`、`CountDownLatch`和`CyclicBarrier`进行线程同步。保证线程安全、实现线程协作和性能调优(如设置线程池大小、避免不必要同步)是重要环节。理解并恰当运用这些工具能提升程序效率和可靠性。
|
8天前
|
Java 开发者
Java中多线程并发控制的实现与优化
【4月更文挑战第17天】 在现代软件开发中,多线程编程已成为提升应用性能和响应能力的关键手段。特别是在Java语言中,由于其平台无关性和强大的运行时环境,多线程技术的应用尤为广泛。本文将深入探讨Java多线程的并发控制机制,包括基本的同步方法、死锁问题以及高级并发工具如java.util.concurrent包的使用。通过分析多线程环境下的竞态条件、资源争夺和线程协调问题,我们提出了一系列实现和优化策略,旨在帮助开发者构建更加健壮、高效的多线程应用。
7 0
|
11天前
|
Java API 调度
安卓多线程和并发处理:提高应用效率
【4月更文挑战第13天】本文探讨了安卓应用中多线程和并发处理的优化方法,包括使用Thread、AsyncTask、Loader、IntentService、JobScheduler、WorkManager以及线程池。此外,还介绍了RxJava和Kotlin协程作为异步编程工具。理解并恰当运用这些技术能提升应用效率,避免UI卡顿,确保良好用户体验。随着安卓技术发展,更高级的异步处理工具将助力开发者构建高性能应用。
|
24天前
|
安全 Java
Java中的多线程并发控制
在Java中,多线程是实现并发执行任务的一种重要方式。然而,随着多个线程同时访问共享资源,可能会导致数据不一致和其他并发问题。因此,了解并掌握Java中的多线程并发控制机制显得尤为重要。本文将深入探讨Java的多线程并发控制,包括synchronized关键字、Lock接口、Semaphore类以及CountDownLatch类等,并通过实例代码演示其使用方法和注意事项。
12 2
|
30天前
|
算法 安全 Unix
【C++ 20 信号量 】C++ 线程同步新特性 C++ 20 std::counting_semaphore 信号量的用法 控制对共享资源的并发访问
【C++ 20 信号量 】C++ 线程同步新特性 C++ 20 std::counting_semaphore 信号量的用法 控制对共享资源的并发访问
30 0
|
1月前
|
负载均衡 Java 数据处理
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(三)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
52 2
|
1月前
|
存储 监控 Java
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(二)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
42 1
|
1月前
|
负载均衡 安全 Java
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(一)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
57 2
|
1月前
|
人工智能 缓存 前端开发
ai对话---多线程并发处理问题
ai对话---多线程并发处理问题