【转载】相对平均分布

简介:

本文转载自http://shift-alt-ctrl.iteye.com/blog/1961598

 

有一个项目用来负责调度集群中的"cron任务",比如一个application中可以配置N个定时任务,这些任务信息最终注册到zookeeper上,并开发了一系列代码用于维护这些任务的"活性";当applicaton中一个server故障,那么这个server上接管的任务,需要迁移到其他server上,如果多个server存活的话,还需要这些任务能够"均衡"的分布.

   其中"负载均衡",很好理解,比如有6个任务,3个server,那么就需要每个server上尽可能的运行2个任务;其实这个事情想起来很简单,但是做起来似乎有些不得不考虑的问题:

    1) "相对平均"怎么设计

    2) 迁移任务时,是否会丢失任务的触发时机;比如一个任务凌晨3点执行,刚好此时运行了一次"均衡",任务在原来的server上没有触发,在新的server上又过了时间..

    3) 迁移任务时,还需要考虑"最少移动"次数,不能大面积迁移任务;只能从"负载高"的server上迁移到"负载低"的.

 

例如:

    sid1: w1 w2 w3 w4

    sid2: w5

    sid3:w6

期望迁移之后:

    sid1:w1 w2

    sid2:w5 w3

    sid3:w4 w6

 

而不是(这种结果迁移的面积太大,只需要把"多余"的任务迁移出去即可,而不是重新洗牌再均衡)

    sid1:w6 w5

    sid2:w2 w3

    sid3:w1 w4

   

经过提取,"相对平均"的设计代码如下,仅作备忘:

Java代码   收藏代码
  1. package com.test.demo.zookeeper;  
  2.   
  3. import java.io.BufferedReader;  
  4. import java.io.InputStreamReader;  
  5. import java.util.*;  
  6.   
  7. public class WorkersBalanceMain {  
  8.     private List<String> servers = new ArrayList<String>();  
  9.     private Map<String, List<String>> current = new HashMap<String, List<String>>();  
  10.     private Set<String> workers = new HashSet<String>();  
  11.   
  12.     public static void main(String[] args) {  
  13.         BufferedReader br = new BufferedReader(new InputStreamReader(System.in));  
  14.         String line;  
  15.         Set<String> servers = new HashSet<String>();  
  16.         WorkersBalanceMain balancer = new WorkersBalanceMain();  
  17.         try {  
  18.             while ((line = br.readLine()) != null) {  
  19.                 if (line.startsWith("addWorker")) {  
  20.                     balancer.addWorkers(line);  
  21.                 } else if (line.startsWith("addServer")) {  
  22.                     balancer.addServers(line);  
  23.                 } else {  
  24.                     System.out.println("???");  
  25.                     continue;  
  26.                 }  
  27.                 balancer.rebalance();  
  28.             }  
  29.         } catch (Exception e) {  
  30.             e.printStackTrace();  
  31.         }  
  32.         System.out.println("--END---");  
  33.     }  
  34.   
  35.     public void addServers(String source) {  
  36.         int index = source.indexOf(" ");  
  37.         if (index == -1) {  
  38.             return;  
  39.         }  
  40.         String[] values = source.substring(index + 1).split(" ");  
  41.         if (values == null || values.length == 0) {  
  42.             return;  
  43.         }  
  44.         for (String server : values) {  
  45.             servers.add(server);  
  46.             if(current.get(server) == null){  
  47.                 current.put(server,new ArrayList<String>());  
  48.             }  
  49.         }  
  50.     }  
  51.   
  52.     public void addWorkers(String source) {  
  53.         int index = source.indexOf(" ");  
  54.         if (index == -1) {  
  55.             return;  
  56.         }  
  57.         String[] values = source.substring(index + 1).split(" ");  
  58.         if (values == null || values.length == 0) {  
  59.             return;  
  60.         }  
  61.         //当有新的worker提交时,将咱有一台机器接管  
  62.         String sid = servers.get(0);  
  63.         List<String> sw = current.get(sid);  
  64.         if(sw == null){  
  65.             current.put(sid,new ArrayList<String>());  
  66.         }  
  67.         for (String worker : values) {  
  68.             workers.add(worker);  
  69.             sw.add(worker);  
  70.         }  
  71.   
  72.     }  
  73.   
  74.     public void rebalance() {  
  75.         try {  
  76.             if (workers.isEmpty()) {  
  77.                 return;  
  78.             }  
  79.             for (String sid : servers) {  
  80.                 if (current.get(sid) == null) {  
  81.                     current.put(sid, new ArrayList<String>());  
  82.                 }  
  83.             }  
  84.             //根据每个sid上的worker个数,整理成一个排序的map  
  85.             TreeMap<Integer, List<String>> counterMap = new TreeMap<Integer, List<String>>();  
  86.             for (Map.Entry<String, List<String>> entry : current.entrySet()) {  
  87.                 int total = entry.getValue().size();  
  88.                 List<String> sl = counterMap.get(total);  
  89.                 if (sl == null) {  
  90.                     sl = new ArrayList<String>();  
  91.                     counterMap.put(total, sl);  
  92.                 }  
  93.                 sl.add(entry.getKey());//sid  
  94.             }  
  95.             int totalWorkers = workers.size();  
  96.             int totalServers = current.keySet().size();  
  97.             int avg = totalWorkers / totalServers;//每个server实例可以接管任务的平均数  
  98.             while (true) {  
  99.                 Map.Entry<Integer, List<String>> gt = counterMap.higherEntry(avg);  //大于平均数的列表, >= avg + 1  
  100.                 Map.Entry<Integer, List<String>> lt = counterMap.lowerEntry(avg); //与平均数差值为2的 <= arg  - 1  
  101.                 //允许任务个数与avg上线浮动1各个,不是绝对的平均  
  102.   
  103.                 if (gt == null || lt == null) {  
  104.                     break;  
  105.                 }  
  106.                 Integer gtKey = gt.getKey();  
  107.                 Integer ltKey = lt.getKey();  
  108.                 if (gtKey - ltKey < 2) {  
  109.                     break;  
  110.                 }  
  111.                 if (gt.getValue().size() == 0) {  
  112.                     counterMap.remove(gt.getKey());  
  113.                 }  
  114.                 if (lt.getValue().size() == 0) {  
  115.                     counterMap.remove(lt.getKey());  
  116.                 }  
  117.                 Iterator<String> it = gt.getValue().iterator(); //sid列表  
  118.                 while (it.hasNext()) {  
  119.                     String _fromSid = it.next();  
  120.                     List<String> _currentWorkers = current.get(_fromSid);  
  121.                     if (_currentWorkers == null || _currentWorkers.isEmpty()) {  
  122.                         it.remove();  
  123.                         current.remove(_fromSid);  
  124.                         continue;  
  125.                     }  
  126.                     List<String> _ltServers = lt.getValue();  
  127.                     if (_ltServers.isEmpty()) {  
  128.                         counterMap.remove(ltKey);  
  129.                         break;  
  130.                     }  
  131.                     //取出需要交换出去的任务id  
  132.                     int _currentWorkersSize = _currentWorkers.size();  
  133.                     String _wid = _currentWorkers.get(_currentWorkersSize - 1);  
  134.                     String _toSid = _ltServers.get(0);  
  135.                     //从_fromSid的worker列表中移除低workerId  
  136.                     //注意:移除最后一个,和_ltWorkers.add(_wid)对应,_ltWorkers将新任务添加到list的尾部  
  137.                     //即从尾部移除,从尾部添加,基本保证"原任务,最少迁移次数"  
  138.                     _currentWorkers.remove(_currentWorkersSize - 1);  
  139.                     it.remove();  
  140.                     _ltServers.remove(0);  
  141.                     //将此workerId添加到_toSid的worker列表中  
  142.                     List<String> _ltWorkers = current.get(_toSid);  
  143.                     if (_ltWorkers == null) {  
  144.                         _ltWorkers = new ArrayList<String>();  
  145.                         current.put(_toSid, _ltWorkers);  
  146.                     }  
  147.                     _ltWorkers.add(_wid);  
  148.                     //将gt的key降低一个数字  
  149.                     List<String> _next = counterMap.get(gtKey - 1);  
  150.                     if (_next == null) {  
  151.                         _next = new ArrayList<String>();  
  152.                         counterMap.put(gtKey - 1, _next);  
  153.                     }  
  154.                     _next.add(_fromSid);  
  155.                     //将lt的key提升一个数字  
  156.                     List<String> _prev = counterMap.get(ltKey + 1);  
  157.                     //从lt的countMap中移除,因为它将被放置在key + 1的新位置  
  158.                     Iterator<String> _ltIt = _ltServers.iterator();  
  159.                     while (_ltIt.hasNext()) {  
  160.                         if (_ltIt.next().equalsIgnoreCase(_toSid)) {  
  161.                             _ltIt.remove();  
  162.                             break;  
  163.                         }  
  164.                     }  
  165.                     if (_prev == null) {  
  166.                         _prev = new ArrayList<String>();  
  167.                         counterMap.put(ltKey + 1, _prev);  
  168.                     }  
  169.                     _prev.add(_toSid);  
  170.                 }  
  171.             }  
  172.             //dump info  
  173.             for (Map.Entry<String, List<String>> entry : current.entrySet()) {  
  174.                 System.out.println("Sid:" + entry.getKey());  
  175.                 System.out.println(entry.getValue().toString());  
  176.             }  
  177.         } catch (Exception e) {  
  178.             e.printStackTrace();  
  179.         }  
  180.   
  181.     }  
  182.   
  183. }  

 

目录
相关文章
|
XML Android开发 数据格式
Android Keyboard(自定义输入法)
Keyboard的xml文件配置 软键盘的布局 自定义软键盘工具类 package com.
2125 0
|
人工智能 监控 供应链
AI技术创业有哪些机会?
本文探讨了AI技术创业的多个机会,包括提供行业解决方案、开发智能产品和服务以及教育和培训,为创业者在医疗保健、金融服务、零售、教育等多个领域提供了丰富的机遇。
689 2
|
11月前
|
人工智能 资源调度 自然语言处理
《探秘:人工智能算法与鸿蒙Next携手赋能元宇宙高并发用户交互》
在元宇宙的宏大蓝图中,高并发用户交互是实现沉浸式体验的关键。鸿蒙Next通过分布式架构、微内核优化、智能场景感知和ArkTS语言等技术,使人工智能算法能高效适配,实现计算资源的最优利用,支持大规模多人在线游戏、商务会议等场景下的流畅交互,推动元宇宙产业蓬勃发展。
285 17
|
12月前
|
设计模式 C# C++
适配器模式(Adapter Pattern)
适配器模式是一种结构型设计模式,通过将一个类的接口转换为客户期望的另一个接口,使原本接口不兼容的类可以一起工作。它包括目标接口、适配者和适配器三个核心角色。适配器模式常用于解决旧系统兼容性问题、第三方库整合和统一接口等场景。该模式有类适配器和对象适配器两种实现方式,分别通过继承和组合实现。适配器模式的优点包括提高兼容性、遵循开闭原则和灵活性高,但也存在适配器数量增加导致复杂性和可能影响性能的缺点。
|
移动开发 Java API
HTML 插件详解
HTML中的插件如Flash、Java applets和ActiveX控件曾广泛用于扩展网页功能,但因安全性问题和跨浏览器兼容性不佳而逐渐被淘汰。现代替代方案包括HTML5的`&lt;audio&gt;`、`&lt;video&gt;`、`&lt;canvas&gt;`和SVG等,以及WebAssembly和各种JavaScript API(如WebRTC和WebGL),这些新技术不仅提升了网页性能和安全性,还改善了用户体验。建议开发者优先采用HTML5和相关API。
|
存储 JavaScript Java
使用NekoHTML解析HTML并提取META标签内容
关于NekoHTML的代码样例,这里提供一个简单的示例,用于展示如何使用NekoHTML来解析HTML文档并提取其中的信息。请注意,由于NekoHTML的具体实现和API可能会随着版本更新而有所变化,以下代码仅供参考。 ### 示例:使用NekoHTML解析HTML并提取META标签内容 ```java import org.cyberneko.html.parsers.DOMParser; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.NodeList; import org.xml
259 1
|
数据采集 人工智能 数据管理
CDGA|信息差不再是障碍:数据治理新策略
在信息爆炸时代,数据成为企业宝贵资产,但数据量激增和来源多样化导致的信息差成为企业发展的障碍。为此,新的数据治理策略应运而生,通过构建统一的数据管理平台、强化数据治理体系、推动数据文化建设、利用AI与大数据技术优化治理,并注重合规性和隐私保护,确保数据质量、安全性和可访问性,消除信息差,提升企业竞争力和创新能力。
|
11月前
|
人工智能 自然语言处理 API
阿里云上的IaC和自动化
本文介绍了阿里云上的自动化与基础设施即代码(IaC)的整体情况。阿里云提供了2万多个API,每日调用量达300亿次,同比增长40%。文中探讨了自动化集成的方式,包括通过API、SDK和IaC工具,并分析了不同场景下的选择策略。对于资源管理较少的企业,控制台界面更合适;而对于高频变更和复杂操作,API和IaC是更好的选择。此外,文章还提到了低代码/无代码解决方案及AI在IaC和自动化中的应用前景。
|
网络协议 Linux 网络虚拟化
什么是 DHCP?为什么要使用它?
【8月更文挑战第4天】
9520 12
什么是 DHCP?为什么要使用它?
|
编解码 算法
为什么受损的视频数据通常显示为绿色?为什么很多30帧/秒的视频实际都是29.976帧/秒?
视频编码采用YUV格式因其亮度与色度分离,利于压缩且兼容黑白显示;受损视频常显绿色因YUV转RGB时Y、U、V为0导致;30帧/秒视频实为29.976帧/秒源于NTSC标准适应彩色电视需求;H.264等标准中H无特定含义,H.264又名MPEG-4 AVC,是ITU-T与ISO/IEC MPEG合作成果。