上一篇已经熟悉了Observable的基本用法,但是如果仅仅只是“生产-消费”的模型,这就体现不出优势了,java有100种办法可以玩这个:)
一、更简单的多线程
正常情况下,生产者与消费者都在同一个线程里处理,参考下面的代码:
final long start = System.currentTimeMillis(); Observable<String> fileSender = Observable.create(emitter -> { for (int i = 1; i < 6; i++) { Thread.sleep(1000); String temp = "thread:" + Thread.currentThread().getId() + " , file " + i + " 的内容"; System.out.println(temp); emitter.onNext(temp); } emitter.onComplete(); }); Observer<String> fileHander = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("准备处理文件..."); } @Override public void onNext(@NonNull String s) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread:" + Thread.currentThread().getId() + " , [" + s + "] 已处理!"); } @Override public void onError(@NonNull Throwable e) { System.out.println("师傅,有妖怪!"); } @Override public void onComplete() { System.out.println("总算完事儿,累屎大爷了!"); long end = System.currentTimeMillis(); System.out.println("耗时:" + (end - start)); } }; fileSender.subscribe(fileHander); Thread.sleep(60000);
假设生产者在读取一堆文件,然后发给消费者处理,通常情况下,这类涉及IO的操作都是很耗时的,我们用sleep(1000)来模拟。
从输出结果上看,生产者与消费者的thread id相同,耗时约为10s。
fileSender.subscribe(fileHander);
如果上面这行,换成
fileSender.subscribeOn(Schedulers.io()) //生产者处理时,放在io线程中 .observeOn(Schedulers.newThread()) //消费者处理时,用新线程 .subscribe(fileHander);
可以看到二个线程id不一样,说明分别在不同的线程里,而且总耗时明显缩短了。
二、更平滑的链式调用
假设我们有一个经典的在线电商场景:用户提交订单后,马上跳到支付页面付款。传统写法,通常是中规中矩的封装2个方法,依次调用。用rxjava后,可以写得更流畅,先做点准备工作:
先定义二个服务接口:订单服务(OrderService)以及支付服务(PayService)
OrderService.java
public interface OrderService { Observable<CreateOrderResponse> createOrder(CreateOrderRequest request) throws Exception; }
PayService.java
public interface PayService { Observable<PayResponse> payOrder(PayRequest request) throws Exception; }
然后来二个实现:
OrderServiceImpl
public class OrderServiceImpl implements OrderService { @Override public Observable<CreateOrderResponse> createOrder(CreateOrderRequest request) throws InterruptedException { System.out.println("threadId:" + Thread.currentThread().getId() + ", 订单创建中:" + request.toString()); CreateOrderResponse response = new CreateOrderResponse(); response.setOrderNo(UUID.randomUUID().toString().replace("-", "")); response.setOrderStatus("NEW"); response.setOrderAmount(request.getOrderAmount()); response.setOrderDesc(request.getOrderDesc()); return Observable.create(emitter -> emitter.onNext(response)); } }
PayServiceImpl
public class PayServiceImpl implements PayService { @Override public Observable<PayResponse> payOrder(PayRequest request) throws InterruptedException { System.out.println("threadId:" + Thread.currentThread().getId() + ", 正在请求支付:" + request); PayResponse response = new PayResponse(); response.setSuccess(true); response.setOrderNo(request.getOrderNo()); response.setTradeNo(UUID.randomUUID().toString().replace("-", "")); return Observable.create(emitter -> emitter.onNext(response)); } }
然后测试一把:
@Test public void test1() throws Exception { OrderService orderService = new OrderServiceImpl(); PayService payService = new PayServiceImpl(); orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00))) //创建订单 //将"创建订单的Response" 转换成 "支付订单的Response" .flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount()))) //支付完成的处理 .subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成")); Thread.sleep(1000);//等待执行完毕 }
链式的写法,更符合阅读习惯,注:flatMap这个操作,通俗点讲,就是将一种口径的子弹,转换成另一种口径的子弹,然后再继续发射。
输出:
threadId:1, 订单创建中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888) threadId:1, 正在请求支付:PayRequest(orderNo=81419b0580d547acbb53955978ace6b8, paymentAmount=8888) threadId:1, 支付完成
可以看到,默认情况下,创建订单/支付订单在同一个线程中,结合前面学到的知识,也可以将它们划分到不同的线程里:(虽然就这个场景而言,这样做的意义不大,因为支付前,肯定要等订单先提交,这个没办法并发处理,这里只是意思一下,可以这样做而已)
@Test public void test2() throws Exception { OrderService orderService = new OrderServiceImpl(); PayService payService = new PayServiceImpl(); orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00))) .subscribeOn(Schedulers.newThread()) //(生产者)创建订单时,使用新线程 .observeOn(Schedulers.newThread()) //(消费者1)接收订单时,使用新线程 .flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount()))) .observeOn(Schedulers.newThread()) //(消费者2)接收支付结果时,使用新线程 .subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成")); Thread.sleep(1000);//等待执行完毕 }
输出:
threadId:1, 订单创建中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888) threadId:13, 正在请求支付:PayRequest(orderNo=d5ff7890f22f486bb1bf8aa8e4f0a3bf, paymentAmount=8888) threadId:14, 支付完成
从threadId看,已经是不同的线程了。
上面的代码,都没考虑到出错的情况,如果支付时出异常了,rxjava如何处理呢?
先改下支付的实现,人为抛个异常:
public class PayServiceImpl implements PayService { @Override public Observable<PayResponse> payOrder(PayRequest request) throws Exception { throw new Exception("支付失败!"); } }
rxjava里有一个重载版本,见: io.reactivex.Observable
@CheckReturnValue @SchedulerSupport("none") public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) { return this.subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer()); }
使用这个版本即可:
@Test public void test3() throws Exception { OrderService orderService = new OrderServiceImpl(); PayService payService = new PayServiceImpl(); orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00))) .flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount()))) .subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"), //异常处理 err -> System.out.println("支付出错啦:" + err.getMessage())); Thread.sleep(1000);//等待执行完毕 }
输出:
threadId:1, 订单创建中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888) 支付出错啦:支付失败!
如果想在订单创建完后,先做些处理,再进行支付,可以这么写:
@Test public void test4() throws Exception { OrderService orderService = new OrderServiceImpl(); PayService payService = new PayServiceImpl(); orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00))) //订单创建完成后的处理 .doOnNext(response -> System.out.println("订单创建完成:" + response)) .flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount()))) .subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"), err -> System.out.println("支付出错啦:" + err.getMessage())); Thread.sleep(1000);//等待执行完毕 }
输出:
threadId:1, 订单创建中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888) 订单创建完成:CreateOrderResponse(orderNo=8c194b1d07c044a8af3771159e1bb2bf, orderDesc=iphone X, orderAmount=8888, orderStatus=NEW) 支付出错啦:支付失败!
最后再说下flatMap与concatMap,看下面二个示例就明白差异:
@Test public void flatMapTest() throws InterruptedException { Observable.create((ObservableOnSubscribe<Integer>) emitter -> { for (int i = 0; i < 10; i++) { emitter.onNext(i); } }).flatMap((Function<Integer, ObservableSource<String>>) integer -> Observable.fromArray(integer + "") .delay(10, TimeUnit.MILLISECONDS) ) .subscribe(s -> System.out.print(s + " ")); Thread.sleep(5000); }
输出:0 1 5 9 2 3 7 4 6 8
@Test public void concatMapTest() throws InterruptedException { Observable.create((ObservableOnSubscribe<Integer>) emitter -> { for (int i = 0; i < 10; i++) { emitter.onNext(i); } }).concatMap((Function<Integer, ObservableSource<String>>) integer -> Observable.fromArray(integer + "") .delay(10, TimeUnit.MILLISECONDS) ) .subscribe(s -> System.out.print(s + " ")); Thread.sleep(5000); }
输出:0 1 2 3 4 5 6 7 8 9
结论:flatMap不保证顺序,concatMap能保证顺序
出处: http://yjmyzz.cnblogs.com
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。