模仿Tomcat的BIO模型,来一个消息,分配一个线程处理.
则主线程池代码如下
package com.guanjian;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
- Created by Administrator on 2018/7/10.
*/
public class ThreadPool {
private ExecutorService service;
private List<MessageTask> tasks;
private int fixedThreadNum = 0;
private List<String> messages;
private MessageHandler messageHandler;
public ThreadPool(int fixedThreadNum,List<String> messages,MessageHandler messageHandler) {
this.fixedThreadNum = fixedThreadNum;
this.messages = messages;
this.messageHandler = messageHandler;
service = Executors.newFixedThreadPool(fixedThreadNum);
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
shutdownGracefully(service);
}
});
}
public void shutdownGracefully(ExecutorService ThreadPool) {
ShutdownPool.shutdownThreadPool(ThreadPool, "main-pool");
}
public void startup() {
tasks = new ArrayList<>();
MessageTask messageTask = (fixedThreadNum == 0 ? new SequentialMessageTask(messageHandler,messages) : new ConcurrentMessageTask(messageHandler,messages));
for (String message:messages) {
tasks.add(messageTask);
service.execute(messageTask);
}
}
}
它是通过线程数fixedThreadNum来区分使用哪种线程模型.
package com.guanjian;
/**
- Created by Administrator on 2018/7/10.
*/
public interface MessageHandler {
public void execute(String message);
}
package com.guanjian;
/**
- Created by Administrator on 2018/7/10.
*/
public class MessageHandlerImpl implements MessageHandler {
@Override
public void execute(String message) {
System.out.println(message);
}
}
以上是消息处理器的接口和实现类
package com.guanjian;
import java.util.List;
/**
- Created by Administrator on 2018/7/10.
*/
public abstract class MessageTask implements Runnable {
protected MessageHandler messageHandler;
protected List<String> messages;
MessageTask(MessageHandler messageHandler,List<String> messages) {
this.messageHandler = messageHandler;
this.messages = messages;
}
@Override
public void run() {
for (String message:messages) {
handlerMessage(message);
}
}
protected abstract void handlerMessage(String message);
}
消息任务抽象类实现了Runnable线程接口,以不同的子类来实现BIO,NIO线程模型,具体在抽象方法handlerMessage中实现.
package com.guanjian;
import java.util.List;
/**
- Created by Administrator on 2018/7/10.
*/
public class SequentialMessageTask extends MessageTask {
SequentialMessageTask(MessageHandler messageHandler, List<String> messages) {
super(messageHandler, messages);
}
@Override
protected void handlerMessage(String message) {
messageHandler.execute(message);
}
}
BIO线程模型子类,通过主线程池来分配线程处理.
package com.guanjian;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
- Created by Administrator on 2018/7/10.
*/
public class ConcurrentMessageTask extends MessageTask {
private ExecutorService asyncService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
ConcurrentMessageTask(MessageHandler messageHandler, List<String> messages) {
super(messageHandler, messages);
}
@Override
protected void handlerMessage(String message) {
asyncService.submit(new Runnable() {
@Override
public void run() {
messageHandler.execute(message);
}
});
}
protected void shutdown() {
ShutdownPool.shutdownThreadPool(asyncService,"async-pool-" + Thread.currentThread().getId());
}
}
NIO线程模型,不再使用主线程池来分配线程,而是异步线程池,类比于Netty中的Worker线程池,从BOSS线程池中接管消息处理.
package com.guanjian;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
- Created by Administrator on 2018/7/10.
*/
public class ShutdownPool {
private static Logger log = LoggerFactory.getLogger(ThreadPool.class);
/**
* 优雅关闭线程池
* @param threadPool
* @param alias
*/
public static void shutdownThreadPool(ExecutorService threadPool, String alias) {
log.info("Start to shutdown the thead pool: {}", alias);
threadPool.shutdown(); // 使新任务无法提交.
try {
// 等待未完成任务结束
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow(); // 取消当前执行的任务
log.warn("Interrupt the worker, which may cause some task inconsistent. Please check the biz logs.");
// 等待任务取消的响应
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
log.error("Thread pool can't be shutdown even with interrupting worker threads, which may cause some task inconsistent. Please check the biz logs.");
}
} catch (InterruptedException ie) {
// 重新取消当前线程进行中断
threadPool.shutdownNow();
log.error("The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconcistent state. Please check the biz logs.");
// 保留中断状态
Thread.currentThread().interrupt();
}
log.info("Finally shutdown the thead pool: {}", alias);
}
}
最后是线程池的优雅关闭,无论是主线程池还是异步线程池皆调用该方法实现优雅关闭.
以上只是模型代码,具体可替换成具体需要的业务代码来达到业务性能的提升.