java-CompletionService

简介: JDK的CompletionService提供了一种将生产新的异步任务与使用已完成任务的结果分离开来的服务,生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。

JDK的CompletionService提供了一种将生产新的异步任务与使用已完成任务的结果分离开来的服务,生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。

任务:

现在要向服务器发送HTTP请求,服务端对于每个请求都需要做很多额外操作,很消耗时间,则可以将每个请求接受之后,提交到CompletionService异步处理,等执行完毕之后,在返回给客户端。

import java.util.concurrent.Callable;  
import java.util.concurrent.CompletionService;  
import java.util.concurrent.ExecutorCompletionService;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
import java.util.concurrent.Future;  

public class CompletionServiceTest {  
    private ExecutorService threadPool = Executors.newCachedThreadPool();  
    private CompletionService<Response> completionService = new ExecutorCompletionService<Response>(  
            Executors.newCachedThreadPool());  

    public CompletionServiceTest() {  
        new Thread() {  
            public void run() {  
                while (true) {  
                    try {  
                        Future<Response> f = completionService.take();  
                        /** 
                         * 获取响应信息,返回给客户端 
                         * 如果completionService任务队列为空,此处将阻塞 
                         */  
                        Response resp = f.get();  
                        System.out.println(resp.getId());  
                    } catch (Exception e) {  
                        System.out.println("Exception happened:"+e.getMessage());  
                    }  
                }  
            };  
        }.start();  
    }  

    class Request{  
        private int rid;  
        private String body;  
        public int getRid() {  
            return rid;  
        }  
        public void setRid(int rid) {  
            this.rid = rid;  
        }  
        public String getBody() {  
            return body;  
        }  
        public void setBody(String body) {  
            this.body = body;  
        }  
    }  

    class Response {  
        private int id;  
        private String body;  
        public int getId() {  
            return id;  
        }  
        public void setId(int id) {  
            this.id = id;  
        }  
        public String getBody() {  
            return body;  
        }  
        public void setBody(String body) {  
            this.body = body;  
        }  
    }  

    class HTTPExecutor {  
        public Future<Response> execute(final Request request) {  
            Future<Response> f = threadPool.submit(new Callable<Response>() {  
                public Response call() throws Exception {  
                    Response response = new Response();  
                    Thread.currentThread().sleep(3000);  
                    response.setId(request.getRid());  
                    response.setBody("response");  
                    return response;  
                }  
            });  
            return f;  
        }  
    }  

    public void submitHTTP(final Request request) {  
        completionService.submit(new Callable<Response>() {  
            public Response call() throws Exception {  
                return new HTTPExecutor().execute(request).get();  
            }  
        });  

    }  

    public static void main(String[] args) {  

        CompletionServiceTest t = new CompletionServiceTest();  
        for (int i = 0; i < 10; i++) {  
            /** 
             * 发送10个HTTP请求 
             */  
            Request request =t.new Request();  
            request.setRid(i);  
            request.setBody("request");  
            t.submitHTTP(request);  
        }  

    }  

}  

ExecutorCompletionService源码

public ExecutorCompletionService(Executor executor) {  
        if (executor == null)  
            throw new NullPointerException();  
        this.executor = executor;  
        this.aes = (executor instanceof AbstractExecutorService) ?  
            (AbstractExecutorService) executor : null;  
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();  
    }  
public ExecutorCompletionService(Executor executor,  
                                     BlockingQueue<Future<V>> completionQueue) {  
        if (executor == null || completionQueue == null)  
            throw new NullPointerException();  
        this.executor = executor;  
        this.aes = (executor instanceof AbstractExecutorService) ?  
            (AbstractExecutorService) executor : null;  
        this.completionQueue = completionQueue;  
    }  

    public Future<V> submit(Callable<V> task) {  
        if (task == null) throw new NullPointerException();  
        RunnableFuture<V> f = newTaskFor(task);  
        executor.execute(new QueueingFuture(f));  
        return f;  
    }  

通过ExecutorCompletionService的构造器可知,CompletionService 依赖于一个单独的 Executor 来实际执行任务,内部管理了一个阻塞队列来,在调用submit方法时,会向创建一个新的RunnableFuture,然后异步执行该RunnableFuture,当其状态变为done后,添加CompletionService的阻塞队列中,外部通过调用take()(阻塞)或者poll()(非阻塞,为空返回null)方法获取执行结果。

目录
相关文章
|
Linux
8.4 【Linux】XFS 文件系统的备份与还原
8.4 【Linux】XFS 文件系统的备份与还原
241 0
|
应用服务中间件 网络安全 nginx
Nginx简易防CC策略规则
Nginx简易防CC策略规则
222 1
|
网络协议 Unix Linux
Linux NSS简解
Linux NSS简解
|
存储 Java 程序员
Java面试加分点!一文读懂HashMap底层实现与扩容机制
本文详细解析了Java中经典的HashMap数据结构,包括其底层实现、扩容机制、put和查找过程、哈希函数以及JDK 1.7与1.8的差异。通过数组、链表和红黑树的组合,HashMap实现了高效的键值对存储与检索。文章还介绍了HashMap在不同版本中的优化,帮助读者更好地理解和应用这一重要工具。
690 5
|
分布式计算 Hadoop Linux
Hadoop集群搭建记录 | 云计算[CentOS7] | 伪分布式集群 各节点之间免密登录
写在前面 step1 安装openssh-server step2 .ssh文件夹的创建及生成密钥文件 step3 传送该文件 step4 slave1&&slave2节点操作 step5 所有节点最终配置 免密登录成功
352 0
Hadoop集群搭建记录 | 云计算[CentOS7] | 伪分布式集群 各节点之间免密登录
|
4天前
|
云安全 人工智能 安全
AI被攻击怎么办?
阿里云提供 AI 全栈安全能力,其中对网络攻击的主动识别、智能阻断与快速响应构成其核心防线,依托原生安全防护为客户筑牢免疫屏障。
|
14天前
|
域名解析 人工智能
【实操攻略】手把手教学,免费领取.CN域名
即日起至2025年12月31日,购买万小智AI建站或云·企业官网,每单可免费领1个.CN域名首年!跟我了解领取攻略吧~
|
8天前
|
安全 Java Android开发
深度解析 Android 崩溃捕获原理及从崩溃到归因的闭环实践
崩溃堆栈全是 a.b.c?Native 错误查不到行号?本文详解 Android 崩溃采集全链路原理,教你如何把“天书”变“说明书”。RUM SDK 已支持一键接入。
567 211