java-CompletionService

简介: JDK的CompletionService提供了一种将生产新的异步任务与使用已完成任务的结果分离开来的服务,生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。

JDK的CompletionService提供了一种将生产新的异步任务与使用已完成任务的结果分离开来的服务,生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。

任务:

现在要向服务器发送HTTP请求,服务端对于每个请求都需要做很多额外操作,很消耗时间,则可以将每个请求接受之后,提交到CompletionService异步处理,等执行完毕之后,在返回给客户端。

import java.util.concurrent.Callable;  
import java.util.concurrent.CompletionService;  
import java.util.concurrent.ExecutorCompletionService;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
import java.util.concurrent.Future;  

public class CompletionServiceTest {  
    private ExecutorService threadPool = Executors.newCachedThreadPool();  
    private CompletionService<Response> completionService = new ExecutorCompletionService<Response>(  
            Executors.newCachedThreadPool());  

    public CompletionServiceTest() {  
        new Thread() {  
            public void run() {  
                while (true) {  
                    try {  
                        Future<Response> f = completionService.take();  
                        /** 
                         * 获取响应信息,返回给客户端 
                         * 如果completionService任务队列为空,此处将阻塞 
                         */  
                        Response resp = f.get();  
                        System.out.println(resp.getId());  
                    } catch (Exception e) {  
                        System.out.println("Exception happened:"+e.getMessage());  
                    }  
                }  
            };  
        }.start();  
    }  

    class Request{  
        private int rid;  
        private String body;  
        public int getRid() {  
            return rid;  
        }  
        public void setRid(int rid) {  
            this.rid = rid;  
        }  
        public String getBody() {  
            return body;  
        }  
        public void setBody(String body) {  
            this.body = body;  
        }  
    }  

    class Response {  
        private int id;  
        private String body;  
        public int getId() {  
            return id;  
        }  
        public void setId(int id) {  
            this.id = id;  
        }  
        public String getBody() {  
            return body;  
        }  
        public void setBody(String body) {  
            this.body = body;  
        }  
    }  

    class HTTPExecutor {  
        public Future<Response> execute(final Request request) {  
            Future<Response> f = threadPool.submit(new Callable<Response>() {  
                public Response call() throws Exception {  
                    Response response = new Response();  
                    Thread.currentThread().sleep(3000);  
                    response.setId(request.getRid());  
                    response.setBody("response");  
                    return response;  
                }  
            });  
            return f;  
        }  
    }  

    public void submitHTTP(final Request request) {  
        completionService.submit(new Callable<Response>() {  
            public Response call() throws Exception {  
                return new HTTPExecutor().execute(request).get();  
            }  
        });  

    }  

    public static void main(String[] args) {  

        CompletionServiceTest t = new CompletionServiceTest();  
        for (int i = 0; i < 10; i++) {  
            /** 
             * 发送10个HTTP请求 
             */  
            Request request =t.new Request();  
            request.setRid(i);  
            request.setBody("request");  
            t.submitHTTP(request);  
        }  

    }  

}  

ExecutorCompletionService源码

public ExecutorCompletionService(Executor executor) {  
        if (executor == null)  
            throw new NullPointerException();  
        this.executor = executor;  
        this.aes = (executor instanceof AbstractExecutorService) ?  
            (AbstractExecutorService) executor : null;  
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();  
    }  
public ExecutorCompletionService(Executor executor,  
                                     BlockingQueue<Future<V>> completionQueue) {  
        if (executor == null || completionQueue == null)  
            throw new NullPointerException();  
        this.executor = executor;  
        this.aes = (executor instanceof AbstractExecutorService) ?  
            (AbstractExecutorService) executor : null;  
        this.completionQueue = completionQueue;  
    }  

    public Future<V> submit(Callable<V> task) {  
        if (task == null) throw new NullPointerException();  
        RunnableFuture<V> f = newTaskFor(task);  
        executor.execute(new QueueingFuture(f));  
        return f;  
    }  

通过ExecutorCompletionService的构造器可知,CompletionService 依赖于一个单独的 Executor 来实际执行任务,内部管理了一个阻塞队列来,在调用submit方法时,会向创建一个新的RunnableFuture,然后异步执行该RunnableFuture,当其状态变为done后,添加CompletionService的阻塞队列中,外部通过调用take()(阻塞)或者poll()(非阻塞,为空返回null)方法获取执行结果。

目录
相关文章
|
7月前
|
Java
JAVA JUC Callable 接口
【1月更文挑战第5天】JAVA JUC Callable 接口
|
7月前
|
Java
Java 并发编程 Future及CompletionService
Java 并发编程 Future及CompletionService `Future`用于异步结果计算。它提供了一些方法来检查计算是否完成,使用`get`方法将阻塞线程直到结果返回 `CompletionService`整合了`Executor`和`BlockingQueue`的功能。将`Callable`任务提交给它去执行,使用`take()`和`poll()`获取最新完成的任务执行结果.
Java 并发编程 Future及CompletionService
|
7月前
|
设计模式 Java 编译器
Java中的线程池你了解多少?
Java中的线程池你了解多少?
65 0
|
安全 Java 容器
Java 中的FutureTask
Java 中的FutureTask
|
缓存 Java 调度
[java]线程池
[java]线程池
54 0
|
缓存 Java 调度
Java中线程池的使用
Java中线程池的使用
|
存储 缓存 Java
JAVA——线程池
JAVA——线程池
158 0
|
Java API 调度
聊聊Java中CompletableFuture的使用(下)
聊聊Java中CompletableFuture的使用
286 0
|
消息中间件 存储 算法
Java 中的线程池
本文主要介绍了线程池框架 Executor,ThreadPoolExecutor 的「构造参数」和「工作行为」,线程池的生命周期,Executors 提供的六种线程池
183 0
Java 中的线程池
|
Java
java 常用线程池介绍
java 常用线程池介绍
139 0
java 常用线程池介绍