前言
学完了并发编程,是否真的能够灵活应用其思想呢?
实践才是检验真理的唯一标准,好记性不如烂笔头。
下面就让我以我一个朋友社招面试钉钉的一道面试题来讲解下并发编程的实际应用吧。
问题描述
// 假设我们有如下代码,query 是公共方法会提供给任意业务方调用,请完成 query 方法 // 要求:多线程情况下 loadFromServer 调用次数最多只执行一次,且每次调用query方法要有回调回来的数据 public class Main { private Executor mExecutor = Executors.newFixedThreadPool(4); private Executor mServerExecutor = Executors.newFixedThreadPool(4); private Data mData; public void queryData(Callback callback) { if (callback == null) { return; } mExecutor.execute(new Runnable() { @Override public void run() { // todo 代码写在这 } }); } private void loadFromServer(Callback callback) { mServerExecutor.execute(new Runnable() { @Override public void run() { // mock try { Thread.sleep(5000L); } catch (InterruptedException e) { throw new RuntimeException(e); } if (callback != null) { callback.onSuccess(new Data()); } } }); } public static class Data { } public interface Callback { void onSuccess(Data data); } }
测试类
public class Test { private static volatile int cnt = 0; public static void main(String[] args) throws InterruptedException { Main main = new Main(); for (int i = 0 ; i < 5; i++) { new Thread(() -> { main.queryData(new Main.Callback() { @Override public void onSuccess(Main.Data data) { if (data == null) { System.out.println("data is null"); } else { System.out.println("getData is " + data); } ++cnt; } }); }).start(); } Thread.sleep(20000L); System.out.println("cnt = " + cnt); } }
这道题的本质就是说,多线程情况下 loadFromServer 调用次数最多只执行一次,且每次调用query方法要有回调回来的数据,光从题意上我们能够很清楚的想到思路,这并不难。
解决思路
常规解法
首先能想到的是,这一看不就是很像多线程情况下的单例模式?其能保证多线程情况下 loadFromServer 调用次数最多只执行一次,但是还需要每次调用query方法要有回调回来的数据,也就是说,假如一次来了五个调用,那么其他调用要等loadFromServer 调用过一次之后,才能够返回,这不就是典型的线程同步问题,可以使用CountDownLatch来实现。
基于此分析,那么我们针对这个问题就非常清晰了,这也是立马能想到的解法之一了。
import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; // 假设我们有如下代码,query 是公共方法会提供给任意业务方调用,请完成 query 方法 // 要求:多线程情况下 loadFromServer 调用次数最多只执行一次,且每次调用query方法要有回调回来的数据 public class Main { private Executor mExecutor = Executors.newFixedThreadPool(4); private Executor mServerExecutor = Executors.newFixedThreadPool(4); private Data mData; // 定义一个 volatile 变量来保证线程可见性和禁止指令重排序 private volatile boolean mHasLoadFromServer = false; private CountDownLatch mLatch = new CountDownLatch(1); public void queryData(Callback callback) { if (callback == null) { return; } mExecutor.execute(new Runnable() { @Override public void run() { // 双重检查加锁 if (!mHasLoadFromServer) { synchronized (Main.this) { if (!mHasLoadFromServer) { loadFromServer(new Callback() { @Override public void onSuccess(Data data) { mData = data; mLatch.countDown(); } }); mHasLoadFromServer = true; try { mLatch.await(); // 等待 loadFromServer 执行完成 } catch (InterruptedException e) { e.printStackTrace(); } } } } callback.onSuccess(mData); } }); } private void loadFromServer(Callback callback) { mServerExecutor.execute(new Runnable() { @Override public void run() { // mock try { Thread.sleep(5000L); } catch (InterruptedException e) { throw new RuntimeException(e); } mData = new Data(); if (callback != null) { callback.onSuccess(mData); } } }); } public static class Data { } public interface Callback { void onSuccess(Data data); } }
运行结果:
getData is Main$Data@17f2e0c9 getData is Main$Data@17f2e0c9 getData is Main$Data@17f2e0c9 getData is Main$Data@17f2e0c9 getData is Main$Data@17f2e0c9 cnt = 5
问题
我们重点看一下这块的代码
public void queryData(Callback callback) { if (callback == null) { return; } mExecutor.execute(new Runnable() { @Override public void run() { // 双重检查加锁 if (!mHasLoadFromServer) { synchronized (Main.this) { if (!mHasLoadFromServer) { loadFromServer(new Callback() { @Override public void onSuccess(Data data) { mData = data; mLatch.countDown(); } }); mHasLoadFromServer = true; try { mLatch.await(); // 等待 loadFromServer 执行完成 } catch (InterruptedException e) { e.printStackTrace(); } } } } callback.onSuccess(mData); } }); }
我们每次其实都需要进行一个锁的判断,假如说后续,如果后续mData不为null,其实是可以直接调用返回的,并不需要进行判断和锁的竞争,这也是性能并不好的情况。
修改:
public void queryData(Callback callback) { if (callback == null) { return; } if (mData != null) { callback.onSuccess(mData); return; } mExecutor.execute(new Runnable() { @Override public void run() { // 双重检查加锁 if (!mHasLoadFromServer) { synchronized (Main.this) { if (!mHasLoadFromServer) { loadFromServer(new Callback() { @Override public void onSuccess(Data data) { mData = data; mLatch.countDown(); } }); mHasLoadFromServer = true; try { mLatch.await(); // 等待 loadFromServer 执行完成 } catch (InterruptedException e) { e.printStackTrace(); } } } } callback.onSuccess(mData); } }); }
但是该方法可能还存在问题,假如在等待的过程中,积攒了太多太多的请求,那么我们集成进行回调的时候,可能超过我们服务器所能承受的阈值,那么可能会影响影响,为此可以采用其他解法来实现。
其他解法
public class Main { private Executor mExecutor = Executors.newFixedThreadPool(4); private Executor mServerExecutor = Executors.newFixedThreadPool(4); private Data mData; private volatile boolean mIsLoading = false; private List<Callback> mCallbacks = new ArrayList<>(); public void queryData(Callback callback) { if (callback == null) { return; } if (mData != null) { callback.onSuccess(mData); return; } synchronized (this) { if (mIsLoading) { // 数据正在加载中,等待回调 // 将回调函数添加到数据加载完成后的回调列表中 mCallbacks.add(callback); return; } mIsLoading = true; mCallbacks.add(callback); } mExecutor.execute(new Runnable() { @Override public void run() { if (mData == null) { loadFromServer(new Callback() { @Override public void onSuccess(Data data) { System.out.println("loadFromServer"); mData = data; notifyCallbacks(data); } }); } else { // 数据已经加载完成,直接返回 callback.onSuccess(mData); } } }); } private void loadFromServer(Callback callback) { mServerExecutor.execute(new Runnable() { @Override public void run() { // mock try { Thread.sleep(5000L); } catch (InterruptedException e) { throw new RuntimeException(e); } if (callback != null) { callback.onSuccess(new Data()); } } }); } private void notifyCallbacks(Data data) { synchronized (this) { for (Callback callback : mCallbacks) { callback.onSuccess(data); } mCallbacks.clear(); } } public static class Data { } public interface Callback { void onSuccess(Data data); } }
这种解法是采用一种回调集合的方法,假如说等待回调的请求过多,完全可以采用生产者消费者的思想来实现,基于回调集合,等到将来回调的时候,根据实际的一个性能阈值从回调集合中进行回调,使得系统能够稳定的运行。
总结
其实这个问题不仅仅想说一些解法的小细节,还是想说,其实这个面试题,更像是真实业务模型中抽取出来的,很偏向于业务开发,当我们学习完并发编程的时候,能够学习这样真实的业务模型,并能针对不同的场景进行分析,就能够触类旁通,更好的将并发编程的解决思路应用于实际问题的解决中去。