[Java 并发基础]多线程编程

简介: [Java 并发基础]多线程编程

线程的创建方式

继承 Thread

  1. 创建一个继承 Thread 类的子类。
  2. 重写 Thread 类的 run() 方法。
  3. 在 run() 方法中编写线程要执行的任务。
  4. 创建 Thread 子类的对象。
  5. 调用 Thread 子类对象的 start() 方法来启动线程。
public class Demo {

    static class MyThread extends Thread {
        @Override
        public void run() {
            System.out.println(">>>> run ");
        }
    }

    public static void main(String[] args) {
        new MyThread().start();
        
    }
}

实现 Runnable 接口

  1. 创建一个实现 Runnable 接口的类。
  2. 在 Runnable 接口的 run() 方法中编写线程要执行的任务。
  3. 创建 Runnable 接口的实现类的对象。
  4. 将 Runnable 接口的实现类的对象传递给 Thread 类的构造方法来创建 Thread 对象。
  5. 调用 Thread 对象的 start() 方法来启动线程。
package org.example.create;

public class Demo1 {

    static class MyThread1 implements Runnable {
        @Override
        public void run() {
            System.out.println(">>>>> 2. 实现Runnable接口");
        }
    }

    public static void main(String[] args) throws Exception {
        new MyThread1().run();
        System.out.println("end");

    }
}

实现 Callable 接口

package org.example.create;

import java.util.concurrent.Callable;

public class Demo2 {

    static class MyThread implements Callable<Void> {
        @Override
        public Void call() throws Exception {
            System.out.println("3. 实现 Callable ");
            return null;
        }
    }

    public static void main(String[] args) throws Exception {
        new MyThread().call();
        System.out.println("end");

    }
}


使用 Lambda

package org.example.create;

public class Demo3 {


    public static void main(String[] args) throws Exception {
        new Thread(()->{
            System.out.println("4. 使用 lambda 表达式");
        }).run();
        System.out.println("end");

    }
}

使用线程池

package org.example.create;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo4 {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);

        // 创建Runnable对象
        Runnable runnable = () -> {
            // 线程的执行逻辑
            System.out.println(Thread.currentThread().getId() + "线程执行逻辑");
        };

        // 提交任务给线程池
        for (int i = 0; i < 50; i++) {
            executor.submit(runnable);
        }

        // 关闭线程池
        executor.shutdown();
    }

}

线程创建相关的 jdk源码

Thread

package java.lang;

public class Thread implements Runnable {
    /* Make sure registerNatives is the first thing <clinit> does. */
    private static native void registerNatives();
    static {
        registerNatives();
    }
}

Runnable函数接口

package java.lang;
/**
The Runnable interface should be implemented by any class whose instances are intended to be executed by a thread. The class must define a method of no arguments called run.
This interface is designed to provide a common protocol for objects that wish to execute code while they are active. For example, Runnable is implemented by class Thread. Being active simply means that a thread has been started and has not yet been stopped.
In addition, Runnable provides the means for a class to be active while not subclassing Thread. A class that implements Runnable can run without subclassing Thread by instantiating a Thread instance and passing itself in as the target. In most cases, the Runnable interface should be used if you are only planning to override the run() method and no other Thread methods. This is important because classes should not be subclassed unless the programmer intends on modifying or enhancing the fundamental behavior of the class.
Since:
1.0
See Also:
Thread, java.util.concurrent.Callable

**/
@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface {@code Runnable} is used
     * to create a thread, starting the thread causes the object's
     * {@code run} method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method {@code run} is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

Callable<V>函数接口

package java.util.concurrent;
/**
A task that returns a result and may throw an exception. Implementors define a single method with no arguments called call.
The Callable interface is similar to Runnable, in that both are designed for classes whose instances are potentially executed by another thread. A Runnable, however, does not return a result and cannot throw a checked exception.
The Executors class contains utility methods to convert from other common forms to Callable classes.
Since:
1.5
See Also:
Executor
Author:
Doug Lea
Type parameters:
<V> – the result type of method call
**/
@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

executors

package java.util.concurrent;
/**
Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods:
Methods that create and return an ExecutorService set up with commonly useful configuration settings.
Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.
Methods that create and return a "wrapped" ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.
Methods that create and return a ThreadFactory that sets newly created threads to a known state.
Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.
Since:
1.5
Author:
Doug Lea
**/
public class Executors {}

线程扩展知识

future

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html

java.util.concurrent
Interface Future<V>
● Type Parameters:V - The result type returned by this Future's get methodAll Known Subinterfaces:
  ○ Response<T>, 
  ○ RunnableFuture<V>, 
  ○ RunnableScheduledFuture<V>, 
  ○ ScheduledFuture<V>
● All Known Implementing Classes:
CompletableFuture, CountedCompleter, ForkJoinTask, FutureTask, RecursiveAction, RecursiveTask, SwingWorker

public interface Future<V>


A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the cancel method. Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled. If you would like to use a Future for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?> and return null as a result of the underlying task.

Sample Usage (Note that the following classes are all made-up.)

interface ArchiveSearcher { String search(String target); }
class App {
    ExecutorService executor = ...
    ArchiveSearcher searcher = ...
    void showSearch(final String target)
    throws InterruptedException {
        Future<String> future
        = executor.submit(new Callable<String>() {
            public String call() {
                return searcher.search(target);
            }});
        displayOtherThings(); // do other things while searching
        try {
            displayText(future.get()); // use future
        } catch (ExecutionException ex) { cleanup(); return; }
    }
}

The FutureTask class is an implementation of Future that implements Runnable, and so may be executed by an Executor. For example, the above construction with submit could be replaced by:

FutureTask<String> future =
new FutureTask<String>(new Callable<String>() {
    public String call() {
        return searcher.search(target);
    }});
executor.execute(future);

Memory consistency effects: Actions taken by the asynchronous computation happen-before actions following the corresponding Future.get() in another thread.

内存一致性影响:异步计算所采取的操作发生在另一个线程中对应的Future.get()之后的操作之前。

Method Summary

Modifier and Type Method and Description
boolean cancel(boolean mayInterruptIfRunning)
Attempts to cancel execution of this task.


V get()
Waits if necessary for the computation to complete, and then retrieves its result.
V get(long timeout, TimeUnit unit)
Waits if necessary for at most the given time for the computation to complete, and then retrieves its result, if available.
boolean isCancelled()
Returns true if this task was cancelled before it completed normally.
boolean isDone()
Returns true if this task completed.

简单示例代码

package org.example.future;

import org.example.service.MedalService;
import org.example.service.UserInfoService;

import java.util.concurrent.*;

public class FutureTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        UserInfoService userInfoService = new UserInfoService();
        MedalService medalService = new MedalService();
        long userId = 666L;
        long startTime = System.currentTimeMillis();

        //调用用户服务获取用户基本信息
        FutureTask<UserInfoService.UserInfo> userInfoFutureTask = new FutureTask<>(new Callable<UserInfoService.UserInfo>() {
            @Override
            public UserInfoService.UserInfo call() throws Exception {
                return userInfoService.getUserInfo(userId);
            }
        });
        executorService.submit(userInfoFutureTask);

        Thread.sleep(300); //模拟主线程其它操作耗时

        FutureTask<MedalService.MedalInfo> medalInfoFutureTask = new FutureTask<>(new Callable<MedalService.MedalInfo>() {
            @Override
            public MedalService.MedalInfo call() throws Exception {
                return medalService.getMedalInfo(userId);
            }
        });
        executorService.submit(medalInfoFutureTask);

        UserInfoService.UserInfo userInfo = userInfoFutureTask.get();//获取个人信息结果
        MedalService.MedalInfo medalInfo = medalInfoFutureTask.get();//获取勋章信息结果
        System.out.println(userInfo);
        System.out.println(medalInfo);
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
        executorService.shutdown();
    }
}



CompletableFuture

异步编程利器:CompletableFuture详解 |Java 开发实战 - 掘金

java.lang.Object
java.util.concurrent.CompletableFuture<T>
All Implemented Interfaces:
CompletionStage<T>, Future<T>

public class CompletableFuture<T>
extends Object
implements Future<T>, CompletionStage<T>

A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion.

When two or more threads attempt to complete, completeExceptionally, or cancel a CompletableFuture, only one of them succeeds.


可以显式完成的Future(设置其值和状态),可以用作CompletionStage,支持在其完成时触发的依赖函数和操作。

当两个或多个线程尝试完成、completeException或取消一个CompletableFuture时,只有一个线程成功。


In addition to these and related methods for directly manipulating status and results, CompletableFuture implements interface CompletionStage with the following policies:


Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

为非异步方法的依赖完成提供的操作可以由完成当前CompletableFuture的线程执行,也可以由完成方法的任何其他调用者执行。


All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task). To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interface CompletableFuture.AsynchronousCompletionTask.

所有没有显式Executor参数的异步方法都使用 ForkJoinPool.commonPool() 执行(除非它不支持至少两个并行级别,在这种情况下,创建一个新线程来运行每个任务)。为了简化监视、调试和跟踪,所有生成的异步任务都是标记接口 CompletableFuture.AsynchronousCompletionTask 的实例。


All CompletionStage methods are implemented independently of other public methods, so the behavior of one method is not impacted by overrides of others in subclasses.

所有CompletionStage方法的实现都独立于其他公共方法,因此一个方法的行为不受子类中其他方法的覆盖的影响。


CompletableFuture also implements Future with the following policies:


Since (unlike FutureTask) this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion. Method cancel has the same effect as completeExceptionally(new CancellationException()). Method isCompletedExceptionally() can be used to determine if a CompletableFuture completed in any exceptional fashion.

由于(与FutureTask不同)该类对导致其完成的计算没有直接控制,因此取消被视为另一种形式的异常完成。方法 cancel 与 completeException (new CancellationException()) 具有相同的效果。方法 isCompletedException() 可用于确定 CompletableFuture 是否以任何异常方式完成。


In case of exceptional completion with a CompletionException, methods get() and get(long, TimeUnit) throw an ExecutionException with the same cause as held in the corresponding CompletionException. To simplify usage in most contexts, this class also defines methods join() and getNow(T) that instead throw the CompletionException directly in these cases.

在使用 CompletionException 异常完成的情况下,方法 get() 和 get(long, TimeUnit) 抛出一个 ExecutionException,其原因与对应的 CompletionException 中持有的原因相同。

为了简化大多数上下文中的使用,这个类还定义了 join() 和 getNow(T) 方法,在这些情况下直接抛出 CompletionException。

All MethodsStatic MethodsInstance MethodsConcrete Methods

Modifier and Type Method and Description

CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
Returns a new CompletionStage that, when either this or the other given stage complete normally, is executed with the corresponding result as argument to the supplied action.

CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
Returns a new CompletionStage that, when either this or the other given stage complete normally, is executed using this stage's default asynchronous execution facility, with the corresponding result as argument to the supplied action.

CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
Returns a new CompletionStage that, when either this or the other given stage complete normally, is executed using the supplied executor, with the corresponding result as argument to the supplied function.

static CompletableFuture<Void>  allOf(CompletableFuture<?>... cfs)
Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete.

static CompletableFuture<Object>  anyOf(CompletableFuture<?>... cfs)
Returns a new CompletableFuture that is completed when any of the given CompletableFutures complete, with the same result.

<U> CompletableFuture<U>  applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
Returns a new CompletionStage that, when either this or the other given stage complete normally, is executed with the corresponding result as argument to the supplied function.

<U> CompletableFuture<U>  applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
Returns a new CompletionStage that, when either this or the other given stage complete normally, is executed using this stage's default asynchronous execution facility, with the corresponding result as argument to the supplied function.

<U> CompletableFuture<U>  applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
Returns a new CompletionStage that, when either this or the other given stage complete normally, is executed using the supplied executor, with the corresponding result as argument to the supplied function.

boolean cancel(boolean mayInterruptIfRunning)
If not already completed, completes this CompletableFuture with a CancellationException.

boolean complete(T value)
If not already completed, sets the value returned by get() and related methods to the given value.

static <U> CompletableFuture<U> completedFuture(U value)
Returns a new CompletableFuture that is already completed with the given value.

boolean completeExceptionally(Throwable ex)
If not already completed, causes invocations of get() and related methods to throw the given exception.

CompletableFuture<T>  exceptionally(Function<Throwable,? extends T> fn)
Returns a new CompletableFuture that is completed when this CompletableFuture completes, with the result of the given function of the exception triggering this CompletableFuture's completion when it completes exceptionally; otherwise, if this CompletableFuture completes normally, then the returned CompletableFuture also completes normally with the same value.

T get()
Waits if necessary for this future to complete, and then returns its result.

T get(long timeout, TimeUnit unit)
Waits if necessary for at most the given time for this future to complete, and then returns its result, if available.

T getNow(T valueIfAbsent)
Returns the result value (or throws any encountered exception) if completed, else returns the given valueIfAbsent.

int getNumberOfDependents()
Returns the estimated number of CompletableFutures whose completions are awaiting completion of this CompletableFuture.

<U> CompletableFuture<U>  handle(BiFunction<? super T,Throwable,? extends U> fn)
Returns a new CompletionStage that, when this stage completes either normally or exceptionally, is executed with this stage's result and exception as arguments to the supplied function.

<U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
Returns a new CompletionStage that, when this stage completes either normally or exceptionally, is executed using this stage's default asynchronous execution facility, with this stage's result and exception as arguments to the supplied function.

<U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
Returns a new CompletionStage that, when this stage completes either normally or exceptionally, is executed using the supplied executor, with this stage's result and exception as arguments to the supplied function.

boolean isCancelled()
Returns true if this CompletableFuture was cancelled before it completed normally.

boolean isCompletedExceptionally()
Returns true if this CompletableFuture completed exceptionally, in any way.

boolean isDone()
Returns true if completed in any fashion: normally, exceptionally, or via cancellation.

T join()
Returns the result value when complete, or throws an (unchecked) exception if completed exceptionally.

void  obtrudeException(Throwable ex)
Forcibly causes subsequent invocations of method get() and related methods to throw the given exception, whether or not already completed.

void  obtrudeValue(T value)
Forcibly sets or resets the value subsequently returned by method get() and related methods, whether or not already completed.

CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
Returns a new CompletionStage that, when this and the other given stage both complete normally, executes the given action.

CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
Returns a new CompletionStage that, when this and the other given stage complete normally, executes the given action using this stage's default asynchronous execution facility.

CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
Returns a new CompletionStage that, when this and the other given stage complete normally, executes the given action using the supplied executor.

CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
Returns a new CompletionStage that, when either this or the other given stage complete normally, executes the given action.

CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action)
Returns a new CompletionStage that, when either this or the other given stage complete normally, executes the given action using this stage's default asynchronous execution facility.

CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)
Returns a new CompletionStage that, when either this or the other given stage complete normally, executes the given action using the supplied executor.

static CompletableFuture<Void>  runAsync(Runnable runnable)
Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() after it runs the given action.

static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)
Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor after it runs the given action.


static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.


static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier.

CompletableFuture<Void> thenAccept(Consumer<? super T> action)
Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied action.


CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
Returns a new CompletionStage that, when this stage completes normally, is executed using this stage's default asynchronous execution facility, with this stage's result as the argument to the supplied action.

CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied action.


<U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
Returns a new CompletionStage that, when this and the other given stage both complete normally, is executed with the two results as arguments to the supplied action.


<U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using this stage's default asynchronous execution facility, with the two results as arguments to the supplied action.


<U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using the supplied executor, with the two results as arguments to the supplied function.


<U> CompletableFuture<U>  thenApply(Function<? super T,? extends U> fn)
Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function.


<U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn)
Returns a new CompletionStage that, when this stage completes normally, is executed using this stage's default asynchronous execution facility, with this stage's result as the argument to the supplied function.


<U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function.


<U,V> CompletableFuture<V>  thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
Returns a new CompletionStage that, when this and the other given stage both complete normally, is executed with the two results as arguments to the supplied function.

<U,V> CompletableFuture<V>  thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using this stage's default asynchronous execution facility, with the two results as arguments to the supplied function.

<U,V> CompletableFuture<V>  thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using the supplied executor, with the two results as arguments to the supplied function.


<U> CompletableFuture<U>  thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
Returns a new CompletionStage that, when this stage completes normally, is executed with this stage as the argument to the supplied function.


<U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
Returns a new CompletionStage that, when this stage completes normally, is executed using this stage's default asynchronous execution facility, with this stage as the argument to the supplied function.


<U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function.


CompletableFuture<Void> thenRun(Runnable action)
Returns a new CompletionStage that, when this stage completes normally, executes the given action.


CompletableFuture<Void> thenRunAsync(Runnable action)
Returns a new CompletionStage that, when this stage completes normally, executes the given action using this stage's default asynchronous execution facility.


CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
Returns a new CompletionStage that, when this stage completes normally, executes the given action using the supplied Executor.

CompletableFuture<T>  toCompletableFuture()
Returns this CompletableFuture.

String  toString()
Returns a string identifying this CompletableFuture, as well as its completion state.


CompletableFuture<T>  whenComplete(BiConsumer<? super T,? super Throwable> action)
Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes.


CompletableFuture<T>  whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
Returns a new CompletionStage with the same result or exception as this stage, that executes the given action using this stage's default asynchronous execution facility when this stage completes.

CompletableFuture<T>  whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
Returns a new CompletionStage with the same result or exception as this stage, that executes the given action using the supplied Executor when this stage completes.

简单示例代码

package org.example.future;

import org.example.service.MedalService;
import org.example.service.UserInfoService;

import java.util.concurrent.*;

import static org.example.service.MedalService.MedalInfo;
import static org.example.service.UserInfoService.UserInfo;

public class CompletableFutureTest {

    public static void main(String[] args) throws ExecutionException, TimeoutException, InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        UserInfoService userInfoService = new UserInfoService();
        MedalService medalService = new MedalService();
        long userId = 666L;
        long startTime = System.currentTimeMillis();
        //调用用户服务获取用户基本信息
        CompletableFuture<UserInfo> completableUserInfoFuture = CompletableFuture
            .supplyAsync(() -> userInfoService.getUserInfo(userId), executorService);

        Thread.sleep(300); //模拟主线程其它操作耗时

        CompletableFuture<MedalInfo> completableMedalInfoFuture = CompletableFuture
            .supplyAsync(() -> medalService.getMedalInfo(userId), executorService);
        UserInfo userInfo = completableUserInfoFuture.get(2, TimeUnit.SECONDS);//获取个人信息结果
        MedalInfo medalInfo = completableMedalInfoFuture.get();//获取勋章信息结果
        System.out.println(userInfo);
        System.out.println(medalInfo);
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
        executorService.shutdown();
    }
}

CompletableFuture使用场景

创建异步任务

CompletableFuture创建异步任务,一般有supplyAsync和runAsync两个方法

  • supplyAsync执行CompletableFuture任务,支持返回值
  • runAsync执行CompletableFuture任务,没有返回值。
supplyAsync方法
//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//自定义线程,根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync方法
//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable) 
//自定义线程,根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)

实例代码如下:

public class FutureTest {

    public static void main(String[] args) {
        //可以自定义线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        //runAsync的使用
        CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("run,关注公众号:捡田螺的小男孩"), executor);
        //supplyAsync的使用
        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
            System.out.print("supply,关注公众号:捡田螺的小男孩");
            return "捡田螺的小男孩"; }, executor);
        //runAsync的future没有返回值,输出null
        System.out.println(runFuture.join());
        //supplyAsync的future,有返回值
        System.out.println(supplyFuture.join());
        executor.shutdown(); // 线程池需要关闭
    }
}
//输出
run,关注公众号:捡田螺的小男孩
null
supply,关注公众号:捡田螺的小男孩捡田螺的小男孩

任务异步回调

1. thenRun/thenRunAsync
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);

CompletableFuture的thenRun方法,通俗点讲就是,做完第一个任务后,再做第二个任务。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值

public class FutureThenRunTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
            ()->{
                System.out.println("先执行第一个CompletableFuture方法任务");
                return "捡田螺的小男孩";
            }
        );

        CompletableFuture thenRunFuture = orgFuture.thenRun(() -> {
            System.out.println("接着执行第二个任务");
        });

        System.out.println(thenRunFuture.get());
    }
}
//输出
先执行第一个CompletableFuture方法任务
接着执行第二个任务
null

thenRun 和thenRunAsync有什么区别呢?可以看下源码哈:

java
复制代码   private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}

如果你执行第一个任务的时候,传入了一个自定义线程池:

  • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池
  • 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池

TIPS: 后面介绍的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个哈。

2.thenAccept/thenAcceptAsync

CompletableFuture的thenAccept方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。

csharp
复制代码public class FutureThenAcceptTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                ()->{
                    System.out.println("原始CompletableFuture方法任务");
                    return "捡田螺的小男孩";
                }
        );

        CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> {
            if ("捡田螺的小男孩".equals(a)) {
                System.out.println("关注了");
            }

            System.out.println("先考虑考虑");
        });

        System.out.println(thenAcceptFuture.get());
    }
}
3. thenApply/thenApplyAsync

CompletableFuture的thenApply方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

csharp
复制代码public class FutureThenApplyTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                ()->{
                    System.out.println("原始CompletableFuture方法任务");
                    return "捡田螺的小男孩";
                }
        );

        CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> {
            if ("捡田螺的小男孩".equals(a)) {
                return "关注了";
            }

            return "先考虑考虑";
        });

        System.out.println(thenApplyFuture.get());
    }
}
//输出
原始CompletableFuture方法任务
关注了
4. exceptionally

CompletableFuture的exceptionally方法表示,某个任务执行异常时,执行的回调方法;并且有抛出异常作为参数,传递到回调方法。

php
复制代码public class FutureExceptionTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                ()->{
                    System.out.println("当前线程名称:" + Thread.currentThread().getName());
                    throw new RuntimeException();
                }
        );

        CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> {
            e.printStackTrace();
            return "你的程序异常啦";
        });

        System.out.println(exceptionFuture.get());
    }
}
//输出
当前线程名称:ForkJoinPool.commonPool-worker-1
java.util.concurrent.CompletionException: java.lang.RuntimeException
  at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
  at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
  at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
  at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
  at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
  at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
  at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
  at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException
  at cn.eovie.future.FutureWhenTest.lambda$main$0(FutureWhenTest.java:13)
  at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
  ... 5 more
你的程序异常啦
5. whenComplete方法

CompletableFuture的whenComplete方法表示,某个任务执行完成后,执行的回调方法,无返回值;并且whenComplete方法返回的CompletableFuture的result是上个任务的结果

csharp
复制代码public class FutureWhenTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                ()->{
                    System.out.println("当前线程名称:" + Thread.currentThread().getName());
                    try {
                        Thread.sleep(2000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "捡田螺的小男孩";
                }
        );

        CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> {
            System.out.println("当前线程名称:" + Thread.currentThread().getName());
            System.out.println("上个任务执行完啦,还把" + a + "传过来");
            if ("捡田螺的小男孩".equals(a)) {
                System.out.println("666");
            }
            System.out.println("233333");
        });

        System.out.println(rstFuture.get());
    }
}
//输出
当前线程名称:ForkJoinPool.commonPool-worker-1
当前线程名称:ForkJoinPool.commonPool-worker-1
上个任务执行完啦,还把捡田螺的小男孩传过来
666
233333
捡田螺的小男孩
6. handle方法

CompletableFuture的handle方法表示,某个任务执行完成后,执行回调方法,并且是有返回值的;并且handle方法返回的CompletableFuture的result是回调方法执行的结果。

csharp
复制代码public class FutureHandlerTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                ()->{
                    System.out.println("当前线程名称:" + Thread.currentThread().getName());
                    try {
                        Thread.sleep(2000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "捡田螺的小男孩";
                }
        );

        CompletableFuture<String> rstFuture = orgFuture.handle((a, throwable) -> {

            System.out.println("上个任务执行完啦,还把" + a + "传过来");
            if ("捡田螺的小男孩".equals(a)) {
                System.out.println("666");
                return "关注了";
            }
            System.out.println("233333");
            return null;
        });

        System.out.println(rstFuture.get());
    }
}
//输出
当前线程名称:ForkJoinPool.commonPool-worker-1
上个任务执行完啦,还把捡田螺的小男孩传过来
666
关注了

多个任务组合处理

AND组合关系

thenCombine / thenAcceptBoth / runAfterBoth都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务

区别在于:

  • thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
  • thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
  • runAfterBoth 不会把执行结果当做方法入参,且没有返回值。
arduino
复制代码public class ThenCombineTest {

    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {

        CompletableFuture<String> first = CompletableFuture.completedFuture("第一个异步任务");
        ExecutorService executor = Executors.newFixedThreadPool(10);
        CompletableFuture<String> future = CompletableFuture
                //第二个异步任务
                .supplyAsync(() -> "第二个异步任务", executor)
                // (w, s) -> System.out.println(s) 是第三个任务
                .thenCombineAsync(first, (s, w) -> {
                    System.out.println(w);
                    System.out.println(s);
                    return "两个异步任务的组合";
                }, executor);
        System.out.println(future.join());
        executor.shutdown();

    }
}
//输出
第一个异步任务
第二个异步任务
两个异步任务的组合
OR 组合的关系

applyToEither / acceptEither / runAfterEither 都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。

区别在于:

  • applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值
  • acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值
  • runAfterEither: 不会把执行结果当做方法入参,且没有返回值。
kotlin
复制代码public class AcceptEitherTest {
    public static void main(String[] args) {
        //第一个异步任务,休眠2秒,保证它执行晚点
        CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{
            try{

                Thread.sleep(2000L);
                System.out.println("执行完第一个异步任务");}
                catch (Exception e){
                    return "第一个任务异常";
                }
            return "第一个异步任务";
        });
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<Void> future = CompletableFuture
                //第二个异步任务
                .supplyAsync(() -> {
                            System.out.println("执行完第二个任务");
                            return "第二个任务";}
                , executor)
                //第三个任务
                .acceptEitherAsync(first, System.out::println, executor);

        executor.shutdown();
    }
}
//输出
执行完第二个任务
第二个任务

AllOf

所有任务都执行完成后,才执行 allOf返回的CompletableFuture。如果任意一个任务异常,allOf的CompletableFuture,执行get方法,会抛出异常

kotlin
复制代码public class allOfFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
            System.out.println("我执行完了");
        });
        CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
            System.out.println("我也执行完了");
        });
        CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m,k)->{
            System.out.println("finish");
        });
    }
}
//输出
我执行完了
我也执行完了
finish

AnyOf

任意一个任务执行完,就执行anyOf返回的CompletableFuture。如果执行的任务异常,anyOf的CompletableFuture,执行get方法,会抛出异常

csharp
复制代码public class AnyOfFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("我执行完了");
        });
        CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
            System.out.println("我也执行完了");
        });
        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{
            System.out.println("finish");
//            return "捡田螺的小男孩";
        });
        anyOfFuture.join();
    }
}
//输出
我也执行完了
finish

thenCompose

thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例

  • 如果该CompletableFuture实例的result不为null,则返回一个基于该result新的CompletableFuture实例;
  • 如果该CompletableFuture实例为null,然后就执行这个新任务
arduino
复制代码public class ThenComposeTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> f = CompletableFuture.completedFuture("第一个任务");
        //第二个异步任务
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> "第二个任务", executor)
                .thenComposeAsync(data -> {
                    System.out.println(data); return f; //使用第一个任务作为返回
                }, executor);
        System.out.println(future.join());
        executor.shutdown();

    }
}
//输出
第二个任务
第一个任务

CompletableFuture使用有哪些注意点

CompletableFuture 使我们的异步编程更加便利的、代码更加优雅的同时,我们也要关注下它,使用的一些注意点。

1. Future需要获取返回值,才能获取异常信息

ini
复制代码ExecutorService executorService = new ThreadPoolExecutor(5, 10, 5L,
    TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
      int a = 0;
      int b = 666;
      int c = b / a;
      return true;
   },executorService).thenAccept(System.out::println);
   
 //如果不加 get()方法这一行,看不到异常信息
 //future.get();

Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。小伙伴们使用的时候,注意一下哈,考虑是否加try…catch…或者使用exceptionally方法。

2. CompletableFuture的get()方法是阻塞的。

CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间~

csharp
复制代码//反例
 CompletableFuture.get();
//正例
CompletableFuture.get(5, TimeUnit.SECONDS);

3. 默认线程池的注意点

CompletableFuture代码中又使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。

4. 自定义线程池时,注意饱和策略

CompletableFuture的get()方法是阻塞的,我们一般建议使用future.get(3, TimeUnit.SECONDS)。并且一般建议使用自定义线程池。

但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离哈。

相关文章
|
1天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
24 12
|
5天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
35 6
|
14天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
14天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
38 3
|
15天前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####
|
Java
Java多线程编程核心技术(三)多线程通信(下篇)
线程是操作系统中独立的个体,但这些个体如果不经过特殊的处理就不能成为一个整体。线程间的通信就是成为整体的必用方案之一,可以说,使线程间进行通信后,系统之间的交互性会更强大,在大大提高CPU利用率的同时还会使程序员对各线程任务在处理的过程中进行有效的把控与监督。
687 0
|
Java
Java多线程编程核心技术(三)多线程通信(上篇)
线程是操作系统中独立的个体,但这些个体如果不经过特殊的处理就不能成为一个整体。线程间的通信就是成为整体的必用方案之一,可以说,使线程间进行通信后,系统之间的交互性会更强大,在大大提高CPU利用率的同时还会使程序员对各线程任务在处理的过程中进行有效的把控与监督。
2565 0
|
Java 安全
Java多线程编程核心技术(二)volatile关键字
关键字volatile的主要作用是使变量在多个线程间可见。
893 0
|
Java
Java多线程编程核心技术(一)Java多线程技能
本文为《Java并发编程系列》第一章,主要介绍并发基础概念与API
2447 0
|
Java
<Java多线程编程核心技术>讲解得太细致啦
一个synchronized关键字,能讲一百多页,搞出几十个小举例。 我是服了!
2238 0