并发编程6-执行器

简介: <p>如果想开发服务器应用,应该有大的吞吐量和快速的响应。  这样就要求服务器段有清晰的任务边界和任务执行策略。</p> <p><br></p> <p>现在看一个服务器应用:</p> <p></p> <pre code_snippet_id="599232" snippet_file_name="blog_20150206_1_5614423" name="code" class=

如果想开发服务器应用,应该有大的吞吐量和快速的响应。  这样就要求服务器段有清晰的任务边界和任务执行策略。


现在看一个服务器应用:

public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket(8080);
        while (true){
            Socket s = ss.accept();
            handleRequest(s);
        }
    }

顺序执行的,造成资源利用率低,  吞吐量或响应速度都很低。

来一个多线程的:

public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket(8080);
        while (true){
            final Socket s = ss.accept();
            new Thread(){
                @Override
                public void run() {
                    handleRequest(s);
                }
            }.start();
        }
    }
会有吞吐量,响应速度上的好处。

但是因为程序并发,可能要注意资源冲突。

另外其还会造成线程无线增长带来的栈溢出,内存溢出, 创建线程的开销过大等问题。


更好的办法是,能够平缓的进行优化,就是能够更好的调度任务,控制线程的个数等等。这就需要用到Executor框架

先来个固定线程数的:

    public static void main(String[] args) throws IOException {
        final Executor exec = Executors.newFixedThreadPool(3);
        ServerSocket ss = new ServerSocket(8080);
        while (true){
            final Socket s = ss.accept();
            Runnable task = new Runnable(){
                @Override
                public void run() {
                    handleRequest(s);
                }
            };
            exec.execute(task);
        }
    }
Exccutors.可以创建多个策略的执行器。 newFixedThreadPool是可以运行指定书目的执行器。

这个是一个生产者和消费者模式,  exec.execute是生产,  FixedThreadPool表示有空闲的线程就能消费这个task.


任务策略

Executor是用一个策略来执行任务,那么策略应该决定线程的那些行为呢:

  • 任务在什么线程中执行
  • 任务以什么顺序执行(FIFO, 优先级?)
  • 可以有多少个任务并发执行
  • 可以有多少个任务进入等待队列
  • 如果要舍弃一个任务,如何选择,并且如何告诉应用程序
  • 在执行任务前后应该做什么
我们需要使用和调整各种策略,来达到最优的效果

这些策略基本上都是基于线程池的。
线程池与任务队列紧密相连。
当线程池中有空闲线程时会从队列中拿一个任务进行执行。

生命周期

ExecutorService。 这个接口包含了很多生命周期的方法:

shutdown()  停止接收新任务,  等待已提交任务的完成,  再次想加入新任务会抛出RejectedExecutionException

shutdownNom  停止为执行的任务,并且尝试关闭正在运行的任务

awaitTermination等待到达终止状态,跟轮询判断isTrminated的效果一样。通常shutdown会紧随其后,进入种植状态后就关闭Executor

isShutdown 判断是否关闭

isTermination 判断是否进入终止状态:

例如可关闭的Server:

public class WebServer {
    private static final ExecutorService exec = Executors.newFixedThreadPool(1);
    public static void main(String[] args) throws IOException {

        ServerSocket ss = new ServerSocket(8080);
        new Thread(){
            @Override
            public void run() {
                while(!exec.isTerminated()){

                }
                System.exit(0);
            }
        }.start();
        while (true){
            final Socket s = ss.accept();
            Runnable task = new Runnable(){
                @Override
                public void run() {
                    handleRequest(s);
                }
            };
            exec.execute(task);
        }
    }

    private static void handleRequest(Socket s) {

        BufferedReader is = null;
        try {
            is = new BufferedReader(new InputStreamReader(s.getInputStream()));
            String line = is.readLine();
            if("exit".equals(line)){
                System.out.println("进来了");
                exec.shutdown();
            }
            System.out.println(line);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                is.close();
                s.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

定时执行

Timer 和TimerTask
使用简单,但是是点线程调度,并且任务如果抛出异常,会影响其他任务

ScheduleThreadPoolExecutor 1.5之后更好的调度执行器


可携带结果的任务Callable 和 Future

public class TestCallable {
    public static void main(String[] args) {
        List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        for (int i = 0; i < 5; i++) {
            futureList.add(executor.submit(new AddCallable(i, i + 1)));
        }

        for (Future<Integer> future : futureList) {
            try {
                System.out.println(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

class AddCallable implements Callable<Integer>{
    private int param1;
    private int param2;
    public AddCallable(int param1, int param2) {
        this.param1 = param1;
        this.param2 = param2;
    }

    @Override
    public Integer call() throws Exception {
        return param1 + param2;
    }
}
ExecutorService submit Callable进去。 返回Future, get()会阻塞到执行结果或者抛出异常, 如果抛出ExecutionException则可以使用getCause得到异常链。

如果第一个任务比第二个任务执行的时间长,则可能造成不能及时获取到返回结果。可以使用get(0)不停地试探,但是可以使用更好的方式使用自带阻塞队列的ExecutorCompletionService中

 public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletionService<String> serv =
                new ExecutorCompletionService<String>(executor);

        serv.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                TimeUnit.SECONDS.sleep(3);
                return "第一个任务执行完";
            }
        });
        serv.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                TimeUnit.SECONDS.sleep(1);
                return "第二个任务执行完";
            }
        });

        for (int i = 0; i < 2; i++) {
            System.out.println(serv.take().get());
        }

        executor.shutdown();
    }



相关文章
|
运维 监控 安全
【网络安全】护网系列-社工&溯源
【网络安全】护网系列-社工&溯源
1292 0
|
Web App开发 移动开发 JavaScript
JS - 微信浏览器(H5)语音录音插件(Recorder H5)
JS - 微信浏览器(H5)语音录音插件(Recorder H5)
2790 0
【ChatGLM】本地版ChatGPT ?6G显存即可轻松使用 !ChatGLM-6B 清华开源模型本地部署教程
【ChatGLM】本地版ChatGPT ?6G显存即可轻松使用 !ChatGLM-6B 清华开源模型本地部署教程
764 0
|
9月前
|
人工智能 搜索推荐 安全
数字孪生与教育:虚拟实验室的兴起
数字孪生技术通过模拟、分析和优化,为教育创新提供了新机遇。特别是在虚拟实验室的构建和应用上,数字孪生技术打破了物理限制,提供了丰富的学习体验,支持精准教学与个性化学习,有效培养学生的创新能力和实践能力。国内外高校已积极应用,未来将更加智能化、个性化。
|
10月前
|
存储 缓存 算法
什么是配置中心页面?
【10月更文挑战第24天】什么是配置中心页面?
201 3
|
11月前
|
人工智能 运维 安全
阿里云跻身央国企上云服务商“全量领导者”
在中国信息通信研究院与弗若斯特沙利文联合发布的《央国企上云服务商供应能力矩阵》三维全景图中,阿里云获评“全量领导者”,并在技术能力维度拿到最高分。
|
12月前
|
SQL 监控 大数据
通过Google Dataflow,我们能够构建一个高效、可扩展且易于维护的实时数据处理系统
【9月更文挑战第7天】随着大数据时代的到来,企业对高效数据处理的需求日益增加,特别是在实时分析和事件驱动应用中。Google Dataflow作为Google Cloud Platform的一项服务,凭借其灵活、可扩展的特点,成为实时大数据处理的首选。本文将介绍Dataflow的基本概念、优势,并通过一个电商日志分析的实际案例和示例代码,展示如何构建高效的数据处理管道。Dataflow不仅支持自动扩展和高可用性,还提供了多种编程语言支持和与GCP其他服务的紧密集成,简化了整个数据处理流程。通过Dataflow,企业可以快速响应业务需求,优化用户体验。
346 3
|
存储 监控 数据可视化
《惊爆!SLS 底层存储之谜大揭秘,竟不是 OSS?!真相令人瞠目结舌!》
【8月更文挑战第15天】在数字化时代,高效管理日志数据至关重要。阿里云日志服务(SLS)提供强大日志管理,支持数据收集、存储、查询与分析。不同于通用对象存储服务(OSS),SLS采用专为日志优化的存储架构,确保高效写入与快速检索。用户仅需调用SLS接口即可轻松管理日志,无需关注底层细节或自行编写复杂代码。SLS通过简化流程,为企业提供专业高效的日志服务解决方案。
246 4
|
机器学习/深度学习 运维 监控
云计算时代的运维革新:从传统到现代化的转变
【8月更文挑战第21天】本文旨在探讨云计算技术如何重塑了传统的运维模式,引领运维工作走向自动化、智能化和集成化的新阶段。我们将从云计算带来的挑战入手,深入分析现代运维的核心技术与实践,并展望其未来发展趋势。文章不包含代码示例,而是聚焦于理念和技术的演变以及它们对运维领域的影响。