android网络业务的封装与调度

简介:

http://ikeepu.com/bar/10266838

手机客户端程序由于网络宽带的约束,尤其在GPRS网络环境下,大数据量的网络交互很大程度上降低应用的响应,影响用户体验。比如,如果做一个手机网盘客户端,在后台上传文件时(大数据量的交互),获取文件列表(命令类的交互)这个过程就显得太别慢。而我们的要求是希望这些命令类操作能尽快得到响应。

 

通常,在手机客户端,我们设计一个网络操作的管理器,来统一管理这些需要联网的操作。

具体做法是把网络操作封装成一个Command(或者说是Task),管理器实现特定的调度规则来调度运行这些Task。

这样做的好处至少有三:

一. 用Command封装了网络操作,使得这些操作与上传的业务分离,解除了强耦合。

二. 可以根据网络情况来确定来采用不同的调度规则,提高用户体验。

三. 重用,这些Task和TaskManager的代码在别的手机应用上基本上能照搬过去。

四. 扩展,当应用需要扩展新的业务时,只有扩展一个新的Command(或者说是Task),接受调度即可,易于扩展。

 

例子:

还是以上文提到的微盘为例,可以概括我们对管理器的设计要求有:

在Wifi网络环境下:

一:各种网络操作可以并行运行。

在GPRS网络环境下:

二:支持优先级抢占调度,命令类操作的优先级比数据传输类的优先级高,当命令类的Task(获取文件列表)提交后,打断数据传输的Task(如上传,下载),等命令类的任务运行完毕,再接着运行数据类任务(断点上传,下载)。

二:同一个优先级的任务可以并行运行,如多个命令一起在网络上传输。

 

实现思路:

TaskManager :

1. TaskManager开辟一个后台线程进行调度工作。

2. 由于要支持多个优先级的抢占调度,我们需要两个队列来维护运行中的Task和等待中的Task。

3. 由于Task的调度是基于优先级的,我们可以使用优先级队列,运行队列采用PriorityQueue,等待队列使用PriorityBlockingQueue,当没有网络业务需要运行时,调度线程阻塞挂起,避免空转。

4. TaskManager设计为单一实例(单一模式)。

5. 每个Task被调度运行时,该Task被从等待队列移动运行队列,当Task执行完毕时,从运行队列删除,唤醒调度线程进行新的调度。

下面是简单的设计代码

public final class TaskEngine implements Runnable{
private PriorityQueue<Task> runningQueue;//运行的task队列
private PriorityBlockingQueue<Task> readyQueue;//就绪的task队列,准备接受调度的task列表
private final AtomicLong taskIdProducer = new AtomicLong(1);//Task Id生成器
private Object sheduleLock = new Object();//同步锁
private static TaskEngine instance;
public long addTask(BusinessObject bo){
  Task task = new Task(bo);
  long newTaskId = taskIdProducer.incrementAndGet();
  task.setTaskId(newTaskId);
  if(this.isWifiNetWork()){ //WIFI网络
  synchronized(sheduleLock){
  runningQueue.add(task);
  } 
  new Thread(task).start();
  }else{ //GPRS网络
  if(readyQueue.offer(task)){ //task入就绪队列 
  final ReentrantLock lock = this.lock;
  lock.lock();
  try{
  needSchedule.signal(); //唤醒调度线程重新调度
  }finally{
  lock.unlock();}
  } 
}
return newTaskId;
}
public final void run(){//task调度逻辑
....
....
}
//挂起调度线程 当不需要调度时
private void waitUntilNeedSchedule() throws InterruptedException
{
.....
}
}

Task:

1. 对要执行的网络操作的封装。

2. Task执行完毕时,发Task结束信号,唤醒调度线程进行新的调度

3. Task要实现Comparable接口,才能让TaskManager的两个队列自动其包含的Task排序。

下面是简单的设计代码

public class Task implements Runnable,Comparable<Task>{
private long taskId;
private BusinessObject bo;//封装网络操作的业务对象的抽象父类,
//它封装了具体的Command,保证了业务扩展中Task的接口不变
@Override
public void run() {
  this.onTaskStart();
  this.bo.execute();
  this.onTaskEnd();
}
private voidonTaskStart()
{...}
public int getPriority()
{...}
public void setPriority(intpriority)
{...}
@Override
public int compareTo(Task object1) {
return this.getPriority()>object1.getPriority()?-1:1;
}
} 

小注意事项:

Android对PriorityQueue的实现和Jdk中的实现有点不一样。

(PriorityQueue.java在android中的代码路径是usr\dalvik\libcore\luni\src\main\java\java\util)

PriorityQueue.remove(Object o) ;//从PriorityQueue中删除一个元素。

对于完成这个删除操作android和jdk都是分两个过程实现,一,找出待删除元素的索引index,二,删除index所在元素。

在JDK中,是通过调用元素的equals方法来找到待删除元素的索引,

private int indexOf(Object o) {
if (o != null) {
for (int i = 0; i < size; i++)
if (o.equals(queue[i]))
return i;
}
return -1;
}

在android中,是间接调用元素的compareTo方法判断结果是否为0来找到待删除元素的索引,

int targetIndex;
for (targetIndex = 0; targetIndex < size; targetIndex++) {
if (0 == this.compare((E) o, elements[targetIndex])) {
break;
}
}
private int compare(E o1, E o2) {
if (null != comparator) {
return comparator.compare(o1, o2);
}
return ((Comparable<? super E>) o1).compareTo(o2);
}

所以为了Task能在执行完毕时从PriorityQueue找到这个Task并删除之,需要在compareTo方法里在优先级相等时返回0。

@Override
public int compareTo(Task object1) {
if(this.getPriority()==object1.getPriority())
return 0;
return this.getPriority()>object1.getPriority()?-1:1;
}

当是这样运行PriorityQueue.remove(Object o) 逻辑上只能删除PriorityQueue里第一个优先级与被删除的元素优先级相等的元素(可能是待删除的元素也可能不是),有误删的可能,需要做如下修改:

@Override
public int compareTo(Task object1) {
if(this.getPriority()==object1.getPriority() && this.equals(object1))
return 0;
return this.getPriority()>object1.getPriority()?-1:1;
}

这样才能正确执行remove(Object o),删除指定的对象o。

 

个人觉得android这样设计使得remove(Object o)复杂化了,不然JDK中那么简洁。语义上也不是那么好懂了,

因为按通常理解,equals是判断两个对象是否相等,compareTo可能是对象的某个属性的比较(类别数据库中的order by),

而现在执行PriorityQueue.remove(Object o),这个对象o明明在容器PriorityQueue中,却删除不了,除非去翻看android中PriorityQueue的实现代码,然后重写compareTo这个方法。这样的API使用时比较容易出错,应该不符号良好的API设计规范

吧。

完整的代码实现如下:


 * @类名:TaskEngine
  * @创建:baiyingjun (devXiaobai@gmail.com)
  * @创建日期:2011-7-7
  * @说明:task调度引擎
  ***************************************************/
 public final class TaskEngine implements Runnable{
 
     private static final String TAG=Log.makeTag(TaskEngine.class);
     
     private PriorityQueue<Task> runningQueue;//运行的task队列
     
     private PriorityBlockingQueue<Task> readyQueue;//就绪的task队列,准备接受调度的task列表
     
     private final AtomicLong taskIdProducer = new AtomicLong(1);
     
     private Object sheduleLock = new Object();//调度锁
     
     private static TaskEngine instance;
     
     private final ReentrantLock lock = new ReentrantLock(true); 
     
     private final Condition needSchedule = lock.newCondition();
     
     private Task currentTask;//准备接受调度的task
     
     private Context mAppContext;
     
     /**
      * add BusinessObject to taskEngine
 */
     public long addTask(BusinessObject bo) throws NetworkNotConnectException{
         Task task = new Task(bo);
         long newTaskId = taskIdProducer.incrementAndGet();
         task.setTaskId(newTaskId);
         if(Log.DBG){
             Log.d(TAG, "Add task with task id "+newTaskId+", priority "+task.getPriority());
         }
         
         if(this.isWifiNetWork()){ //WIFI网络
             synchronized(sheduleLock){
                 runningQueue.add(task);
             }
             new Thread(task).start();
         }else{ //GPRS网络
             if(readyQueue.offer(task)){ //task入就绪队列
                 if(Log.DBG)
                     Log.d(TAG, "add task " +task.bo.methodName+" "+task.taskId+" to ready queue");
                 final ReentrantLock lock = this.lock;
                 lock.lock();
                 try{
                     needSchedule.signal(); //唤醒调度线程重新调度
                 }finally{
                     lock.unlock();
                 }
             }
             //schedule();
         }
         return newTaskId;
     }
     
     private TaskEngine(Context context){
         mAppContext = context;
         runningQueue = new PriorityQueue<Task>();
         readyQueue = new PriorityBlockingQueue<Task>();
         new Thread(this).start();
         Log.i(TAG, "shedule thread working");
     }
     
     public synchronized static TaskEngine getInstance(Context context){
         Context appContext = context.getApplicationContext();
         if(instance==null || instance.mAppContext!=appContext){
             instance=new TaskEngine(appContext);
         }
         return instance;
     }
     
     protected boolean isWifiNetWork() throws NetworkNotConnectException{
         return NetworkManager.isWIFINetWork(mAppContext);
     }
     
     /**
      * task调度逻辑
 */
     public final void run(){
         Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
         while(true){
             try {
                 if(this.isWifiNetWork()){
                     Task task = this.readyQueue.take();
                     if(task !=null){
                         synchronized(sheduleLock){
                             runningQueue.add(task);
                         }
                         new Thread(task).start();
                     }
                 }
                 else{//非wifi网络
 //空就绪队列,空运行队列,等待直到有任务到来
                     if(this.readyQueue.size()==0 && runningQueue.size()==0){
                         currentTask=readyQueue.take();
                         synchronized(sheduleLock){
                             runningQueue.add(currentTask);
                         }
                         new Thread(currentTask).start();
                     }
                     //抢占式调度(就绪队列非空,运行队列优先级比就绪队列优先级低)
                     else if(readyQueue.size()>0 && 
                             this.readyQueue.element().getPriority()>=this.getMaxPriority()){
                         currentTask = readyQueue.take();
                         if(currentTask.getPriority()>this.getMaxPriority()){//暂停低优先级的任务运行
                             synchronized(sheduleLock){
                                 for(int i=0;i<runningQueue.size();i++){
                                     Task toStopTask =runningQueue.remove();
                                     //因为任务调度,将低优先级的任务暂时给冻结起来
                                     toStopTask.setState(Task.STATE_FROST);
                                     readyQueue.add(toStopTask);
                                 }
                             }
                         }
                         //运行被调度的任务
                         runningQueue.add(currentTask);
                         new Thread(currentTask).start();
                     }else {//等高优先级的任务运行完毕
                         waitUntilNeedSchedule();
                     }
                 }
             
             }catch (InterruptedException e) {
                 Log.e(TAG, "Schedule error "+e.getMessage());
             } catch (NetworkNotConnectException e) {
                 // TODO Auto-generated catch block
                 e.printStackTrace();
             }
         }
     }
 
     /*
      * 等待,直到就绪队列里的最高优先级比当前运行优先级高,或者就绪队列为空时等待运行队列运行完毕
 */
     private void waitUntilNeedSchedule() throws InterruptedException{
          final ReentrantLock lock = this.lock;
          lock.lockInterruptibly();
             try {
                 try{
                     while ((readyQueue.size()>0 
                             && readyQueue.element().getPriority()<getMaxPriority())//等高优先级的任务运行完毕
                             || (readyQueue.size()==0 && runningQueue.size()>0) ){//或者等运行队列运行完毕
                         if(Log.DBG)
                             Log.d(TAG, "waiting sheduling........");
                         needSchedule.await();
                     }
                 }
                 catch (InterruptedException ie) {
                     needSchedule.signal(); // propagate to non-interrupted thread
                     throw ie;
                 }
             } finally {
                 lock.unlock();
             }
     }
     
     /**
      * Hand the specified task ,such as pause,delete and so on
 */
     public boolean handTask(long taskId,int handType) {
         Log.i(TAG, "set task`s state with taskId "+taskId);
         synchronized(this.sheduleLock){
             //如果在运行队列里,取消该任务
             Iterator<Task> runningItor= this.runningQueue.iterator();
             while(runningItor.hasNext()){
                 Task task = runningItor.next();
                 boolean b = task.equals(this);
                 if(task.getTaskId()==taskId){
                     runningQueue.remove(task);
                     task.setState(handType);
                     Log.i(TAG, "set runningQueue taskId = "+taskId + " state " + handType);
                     return true;
                 }
             }
             //如果在就绪队列里,删除
             Iterator<Task> readyItor= this.readyQueue.iterator();
             while(readyItor.hasNext()){
                 Task task = readyItor.next();
                 if(task.getTaskId()==taskId){
 //                    readyQueue.remove(task);
                     task.setState(handType);
                     Log.i(TAG, "set readyQueue taskId = "+taskId + " state " + handType);
                     return true;
                 }
             }
             return false;
         }
     }
     /***
      * 获取运行队列任务的最高优先级
 */
     private int getMaxPriority(){
         if(this.runningQueue==null || this.runningQueue.size()==0)
             return -1;
         else{
             return this.runningQueue.element().getPriority();
         }
     }
         
     /***************************************************
      * @类名:Task
      * @创建:baiyingjun (devXiaobai@gmail.com)
      * @创建日期:2011-7-7
      * @说明:业务对象的包装成可运行实体
      ***************************************************/
     public class Task implements Runnable,Comparable<Task>{
 
         //运行
         public static final int STATE_INIT = -1;
         //运行
         public static final int STATE_RUN = 0;
         //停止
         public static final int STATE_STOP = 1;
         //暂停
         public static final int STATE_PAUSE = 2;
         //取消
         public static final int STATE_CANCLE = 3;
         //冻结,如果在GPRS下,因为线程调度的时候低优先级的被放readyqueue里的时候,要把这个任务暂时给“冻结”起来
         public static final int STATE_FROST = 4;
         
         private long taskId;
         
         private BusinessObject bo;
         
         public Task(){
             
         }
         
         public Task(BusinessObject bo){
             this.bo=bo;
         }
         
         @Override
         public void run() {
             this.onTaskStart();
             this.bo.execute();
             this.onTaskEnd();
         }
 
         private void onTaskStart(){
             this.bo.setmState(STATE_RUN);
         }
         
         public long getTaskId() {
             return taskId;
         }
 
         public void setTaskId(long taskId) {
             this.taskId = taskId;
         }
 
         public int getPriority() {
             return this.bo.getPriority();
         }
 
         public void setPriority(int priority) {
             this.bo.setPriority(priority);
         }
 
         /* 
          * compare task priority
 */
         @Override
         public int compareTo(Task object1) {
             if(this.getPriority()==object1.getPriority()&& this.equals(object1))
                 return 0;
             return this.getPriority()>object1.getPriority()?-1:1;
         }
         
         public void setState(int state){//设置当前运行的task的state
             this.bo.setmState(state);
             Log.d(TAG, "Set task "+this.bo.methodName+" "+this.taskId + " state " + state);
         }
         
         private void onTaskEnd(){//运行完毕后从taskengine运行队列里删除
             if(Log.DBG){
                 Log.d(TAG, "task "+this.bo.methodName+" "+taskId+" End");
             }
             if(this.bo.getmState() == STATE_FROST)//因为调度停止了该业务
                 return;
             
             final ReentrantLock lock = TaskEngine.this.lock;
             lock.lock();
             try{
                 boolean removed = runningQueue.remove(this); //remove from running queue
                 assert removed;
                 if(Log.DBG)
                     Log.d(TAG, this.bo.methodName+" "+this.taskId+" remove from runningQueue");
                 needSchedule.signal(); //唤醒调度线程重新调度
             }finally{
                 lock.unlock();
             }
         }
 
         @Override
         public boolean equals(Object o) {
             // TODO Auto-generated method stub
             if(this==o){
                 return true;
             }
             if(o instanceof Task){
                 return taskId==((Task)o).taskId;
             }
             return false;
         }
     }
     
 }

拾漏补遗:

1.补充最初的设计类图(可能与代码不太一致,但能说明问题)

2. BusinessObject的实现代码:

 /***************************************************
  * @类名:BusinessObject
  * @创建:baiyingjun (devXiaobai@gmail.com)
  * @创建日期:2011-7-6
  * @说明:抽象的业务,扩展的网络业务要继承这个类并实现抽象方法execute()
  ***************************************************/
 public abstract class BusinessObject {
     
     public static final int LOWEST_PRIORITY=1;//最低优先级
     
     public static final int LOW_PRIORITY=2;//低优先级
     
     public static final int NORMAL_PRIORITY=3;//正常优先级
     
     public static final int HIGH_PRIORITY=4;//高优先级
     
     public static final int HIGHEST_PRIORITY=5;//最高优先级
     
     protected BusinessListener listnener;//运行业务的回调
     
     protected Context mContext;
     
     private long taskId;
     
     protected Map<String,Object> params;//运行业务需要的参数
     
     private int priority;
     
     public int getPriority() {
         return priority;
     }
 
     public void setPriority(int priority) {
         this.priority = priority;
     }
 
     //设置回调
     public void addBusinessListener(BusinessListener listnener){
         this.listnener=listnener;
     }
     
     public long doBusiness() throws NetworkNotConnectException{
         taskId= TaskEngine.getInstance(mContext).addTask(this);
         return taskId;
     }
     
     public abstract void execute();
 }


相关文章
|
13天前
|
网络协议 Java 程序员
【网络】局域网LAN、广域网WAN、TCP/IP协议、封装和分用
【网络】局域网LAN、广域网WAN、TCP/IP协议、封装和分用
17 2
|
2月前
|
安全 网络安全 Android开发
安卓与iOS开发:选择的艺术网络安全与信息安全:漏洞、加密与意识的交织
【8月更文挑战第20天】在数字时代,安卓和iOS两大平台如同两座巍峨的山峰,分别占据着移动互联网的半壁江山。它们各自拥有独特的魅力和优势,吸引着无数开发者投身其中。本文将探讨这两个平台的特点、优势以及它们在移动应用开发中的地位,帮助读者更好地理解这两个平台的差异,并为那些正在面临选择的开发者提供一些启示。
119 56
|
9天前
|
网络协议 网络架构
【第三期】计算机网络常识/网络分层模型与数据包封装传输过程
【第三期】计算机网络常识/网络分层模型与数据包封装传输过程
27 0
|
2月前
|
算法 安全 网络安全
探索操作系统核心:进程调度的奥秘网络安全的盾牌与剑——漏洞防御与加密技术
【8月更文挑战第30天】在数字世界的每一次点击和命令背后,都隐藏着一个不为人知的英雄——进程调度器。它默默无闻地在后台工作,确保我们的命令得以流畅执行。本文将揭开这位幕后英雄的面纱,带你了解进程调度的原理、重要性以及它是如何在操作系统中发挥作用的。无论你是编程新手还是资深开发者,理解进程调度都能帮你更好地掌握计算机的运作原理。准备好深入操作系统的核心,一探究竟了吗?让我们开始吧!
|
2月前
|
小程序 数据安全/隐私保护
Taro@3.x+Vue@3.x+TS开发微信小程序,网络请求封装
在 `src/http` 目录下创建 `request.ts` 文件,并配置 Taro 的网络请求方法 `Taro.request`,支持多种 HTTP 方法并处理数据加密。
Taro@3.x+Vue@3.x+TS开发微信小程序,网络请求封装
|
2月前
|
达摩院 供应链 JavaScript
网络流问题--仓储物流调度【数学规划的应用(含代码)】阿里达摩院MindOpt
本文通过使用MindOpt工具优化仓储物流调度问题,旨在提高物流效率并降低成本。首先,通过考虑供需匹配、运输时间与距离、车辆容量、仓库储存能力等因素构建案例场景。接着,利用数学规划方法,包括线性规划和网络流问题,来建立模型。在网络流问题中,通过定义节点(资源)和边(资源间的关系),确保流量守恒和容量限制条件下找到最优解。文中还详细介绍了MindOpt Studio云建模平台和MindOpt APL建模语言的应用,并通过实例展示了如何声明集合、参数、变量、目标函数及约束条件,并最终解析了求解结果。通过这些步骤,实现了在满足各仓库需求的同时最小化运输成本的目标。
|
2月前
|
安全 网络安全 Android开发
探索安卓开发之旅:从新手到专家网络安全与信息安全:防范网络威胁,保护数据安全
【8月更文挑战第29天】在这篇技术性文章中,我们将踏上一段激动人心的旅程,探索安卓开发的世界。无论你是刚开始接触编程的新手,还是希望提升技能的资深开发者,这篇文章都将为你提供宝贵的知识和指导。我们将从基础概念入手,逐步深入到安卓开发的高级主题,包括UI设计、数据存储、网络通信等方面。通过阅读本文,你将获得一个全面的安卓开发知识体系,并学会如何将这些知识应用到实际项目中。让我们一起开启这段探索之旅吧!
|
2月前
|
安全 网络安全 调度
云计算与网络安全:云服务、网络安全、信息安全等技术领域深入理解操作系统:进程调度的奥秘
【8月更文挑战第27天】本文将探讨云计算与网络安全之间的关联,包括云服务、网络安全、信息安全等技术领域。我们将通过代码示例来展示如何保护云计算环境中的数据和应用程序。最后,我们将讨论如何应对云计算环境中的网络安全挑战,以保护数据和应用程序免受攻击。
|
3月前
|
缓存 负载均衡 算法
(四)网络编程之请求分发篇:负载均衡静态调度算法、平滑轮询加权、一致性哈希、最小活跃数算法实践!
先如今所有的技术栈中,只要一谈关于高可用、高并发处理相关的实现,必然会牵扯到集群这个话题,也就是部署多台服务器共同对外提供服务,从而做到提升系统吞吐量,优化系统的整体性能以及稳定性等目的。
|
3月前
|
算法 网络性能优化 调度
基于De-Jitter Buffer算法的无线网络业务调度matlab仿真,对比RR调度算法
1. **功能描述**: 提出了一个去抖动缓冲区感知调度器,结合用户终端的缓冲状态减少服务中断。该算法通过动态调整数据包发送速率以优化网络延迟和吞吐量。 2. **测试结果**: 使用MATLAB 2022a进行了仿真测试,结果显示De-Jitter Buffer算法在网络拥塞时比RR调度算法更能有效利用资源,减少延迟,并能根据网络状态动态调整发送速率。 3. **核心程序**: MATLAB代码实现了调度逻辑,包括排序、流量更新、超时和中断处理等功能。 仿真结果和算法原理验证了De-Jitter Buffer算法在无线网络调度中的优势。