相对平均分布

简介:

有一个项目用来负责调度集群中的"cron任务",比如一个application中可以配置N个定时任务,这些任务信息最终注册到zookeeper上,并开发了一系列代码用于维护这些任务的"活性";当applicaton中一个server故障,那么这个server上接管的任务,需要迁移到其他server上,如果多个server存活的话,还需要这些任务能够"均衡"的分布.
其中"负载均衡",很好理解,比如有6个任务,3个server,那么就需要每个server上尽可能的运行2个任务;其实这个事情想起来很简单,但是做起来似乎有些不得不考虑的问题:

1) "相对平均"怎么设计
2) 迁移任务时,是否会丢失任务的触发时机;比如一个任务凌晨3点执行,刚好此时运行了一次"均衡",任务在原来的server上没有触发,在新的server上又过了时间..
3) 迁移任务时,还需要考虑"最少移动"次数,不能大面积迁移任务;只能从"负载高"的server上迁移到"负载低"的.

例如:

sid1: w1 w2 w3 w4
sid2: w5
sid3:w6

期望迁移之后:

sid1:w1 w2
sid2:w5 w3
sid3:w4 w6

而不是(这种结果迁移的面积太大,只需要把"多余"的任务迁移出去即可,而不是重新洗牌再均衡)

sid1:w6 w5
sid2:w2 w3
sid3:w1 w4

经过提取,"相对平均"的设计代码如下,仅作备忘:
Java代码

package com.test.demo.zookeeper;  
  
import java.io.BufferedReader;  
import java.io.InputStreamReader;  
import java.util.*;  
  
public class WorkersBalanceMain {  
    private List<String> servers = new ArrayList<String>();  
    private Map<String, List<String>> current = new HashMap<String, List<String>>();  
    private Set<String> workers = new HashSet<String>();  
  
    public static void main(String[] args) {  
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));  
        String line;  
        Set<String> servers = new HashSet<String>();  
        WorkersBalanceMain balancer = new WorkersBalanceMain();  
        try {  
            while ((line = br.readLine()) != null) {  
                if (line.startsWith("addWorker")) {  
                    balancer.addWorkers(line);  
                } else if (line.startsWith("addServer")) {  
                    balancer.addServers(line);  
                } else {  
                    System.out.println("???");  
                    continue;  
                }  
                balancer.rebalance();  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
        System.out.println("--END---");  
    }  
  
    public void addServers(String source) {  
        int index = source.indexOf(" ");  
        if (index == -1) {  
            return;  
        }  
        String[] values = source.substring(index + 1).split(" ");  
        if (values == null || values.length == 0) {  
            return;  
        }  
        for (String server : values) {  
            servers.add(server);  
            if(current.get(server) == null){  
                current.put(server,new ArrayList<String>());  
            }  
        }  
    }  
  
    public void addWorkers(String source) {  
        int index = source.indexOf(" ");  
        if (index == -1) {  
            return;  
        }  
        String[] values = source.substring(index + 1).split(" ");  
        if (values == null || values.length == 0) {  
            return;  
        }  
        //当有新的worker提交时,将咱有一台机器接管  
        String sid = servers.get(0);  
        List<String> sw = current.get(sid);  
        if(sw == null){  
            current.put(sid,new ArrayList<String>());  
        }  
        for (String worker : values) {  
            workers.add(worker);  
            sw.add(worker);  
        }  
  
    }  
  
    public void rebalance() {  
        try {  
            if (workers.isEmpty()) {  
                return;  
            }  
            for (String sid : servers) {  
                if (current.get(sid) == null) {  
                    current.put(sid, new ArrayList<String>());  
                }  
            }  
            //根据每个sid上的worker个数,整理成一个排序的map  
            TreeMap<Integer, List<String>> counterMap = new TreeMap<Integer, List<String>>();  
            for (Map.Entry<String, List<String>> entry : current.entrySet()) {  
                int total = entry.getValue().size();  
                List<String> sl = counterMap.get(total);  
                if (sl == null) {  
                    sl = new ArrayList<String>();  
                    counterMap.put(total, sl);  
                }  
                sl.add(entry.getKey());//sid  
            }  
            int totalWorkers = workers.size();  
            int totalServers = current.keySet().size();  
            int avg = totalWorkers / totalServers;//每个server实例可以接管任务的平均数  
            while (true) {  
                Map.Entry<Integer, List<String>> gt = counterMap.higherEntry(avg);  //大于平均数的列表, >= avg + 1  
                Map.Entry<Integer, List<String>> lt = counterMap.lowerEntry(avg); //与平均数差值为2的 <= arg  - 1  
                //允许任务个数与avg上线浮动1各个,不是绝对的平均  
  
                if (gt == null || lt == null) {  
                    break;  
                }  
                Integer gtKey = gt.getKey();  
                Integer ltKey = lt.getKey();  
                if (gtKey - ltKey < 2) {  
                    break;  
                }  
                if (gt.getValue().size() == 0) {  
                    counterMap.remove(gt.getKey());  
                }  
                if (lt.getValue().size() == 0) {  
                    counterMap.remove(lt.getKey());  
                }  
                Iterator<String> it = gt.getValue().iterator(); //sid列表  
                while (it.hasNext()) {  
                    String _fromSid = it.next();  
                    List<String> _currentWorkers = current.get(_fromSid);  
                    if (_currentWorkers == null || _currentWorkers.isEmpty()) {  
                        it.remove();  
                        current.remove(_fromSid);  
                        continue;  
                    }  
                    List<String> _ltServers = lt.getValue();  
                    if (_ltServers.isEmpty()) {  
                        counterMap.remove(ltKey);  
                        break;  
                    }  
                    //取出需要交换出去的任务id  
                    int _currentWorkersSize = _currentWorkers.size();  
                    String _wid = _currentWorkers.get(_currentWorkersSize - 1);  
                    String _toSid = _ltServers.get(0);  
                    //从_fromSid的worker列表中移除低workerId  
                    //注意:移除最后一个,和_ltWorkers.add(_wid)对应,_ltWorkers将新任务添加到list的尾部  
                    //即从尾部移除,从尾部添加,基本保证"原任务,最少迁移次数"  
                    _currentWorkers.remove(_currentWorkersSize - 1);  
                    it.remove();  
                    _ltServers.remove(0);  
                    //将此workerId添加到_toSid的worker列表中  
                    List<String> _ltWorkers = current.get(_toSid);  
                    if (_ltWorkers == null) {  
                        _ltWorkers = new ArrayList<String>();  
                        current.put(_toSid, _ltWorkers);  
                    }  
                    _ltWorkers.add(_wid);  
                    //将gt的key降低一个数字  
                    List<String> _next = counterMap.get(gtKey - 1);  
                    if (_next == null) {  
                        _next = new ArrayList<String>();  
                        counterMap.put(gtKey - 1, _next);  
                    }  
                    _next.add(_fromSid);  
                    //将lt的key提升一个数字  
                    List<String> _prev = counterMap.get(ltKey + 1);  
                    //从lt的countMap中移除,因为它将被放置在key + 1的新位置  
                    Iterator<String> _ltIt = _ltServers.iterator();  
                    while (_ltIt.hasNext()) {  
                        if (_ltIt.next().equalsIgnoreCase(_toSid)) {  
                            _ltIt.remove();  
                            break;  
                        }  
                    }  
                    if (_prev == null) {  
                        _prev = new ArrayList<String>();  
                        counterMap.put(ltKey + 1, _prev);  
                    }  
                    _prev.add(_toSid);  
                }  
            }  
            //dump info  
            for (Map.Entry<String, List<String>> entry : current.entrySet()) {  
                System.out.println("Sid:" + entry.getKey());  
                System.out.println(entry.getValue().toString());  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
  
    }  
  
}

原文链接:[http://wely.iteye.com/blog/2228792]

相关文章
|
6天前
|
云安全 人工智能 安全
AI被攻击怎么办?
阿里云提供 AI 全栈安全能力,其中对网络攻击的主动识别、智能阻断与快速响应构成其核心防线,依托原生安全防护为客户筑牢免疫屏障。
|
16天前
|
域名解析 人工智能
【实操攻略】手把手教学,免费领取.CN域名
即日起至2025年12月31日,购买万小智AI建站或云·企业官网,每单可免费领1个.CN域名首年!跟我了解领取攻略吧~
|
10天前
|
安全 Java Android开发
深度解析 Android 崩溃捕获原理及从崩溃到归因的闭环实践
崩溃堆栈全是 a.b.c?Native 错误查不到行号?本文详解 Android 崩溃采集全链路原理,教你如何把“天书”变“说明书”。RUM SDK 已支持一键接入。
621 216
|
存储 人工智能 监控
从代码生成到自主决策:打造一个Coding驱动的“自我编程”Agent
本文介绍了一种基于LLM的“自我编程”Agent系统,通过代码驱动实现复杂逻辑。该Agent以Python为执行引擎,结合Py4j实现Java与Python交互,支持多工具调用、记忆分层与上下文工程,具备感知、认知、表达、自我评估等能力模块,目标是打造可进化的“1.5线”智能助手。
860 61
|
8天前
|
人工智能 移动开发 自然语言处理
2025最新HTML静态网页制作工具推荐:10款免费在线生成器小白也能5分钟上手
晓猛团队精选2025年10款真正免费、无需编程的在线HTML建站工具,涵盖AI生成、拖拽编辑、设计稿转代码等多种类型,均支持浏览器直接使用、快速出图与文件导出,特别适合零基础用户快速搭建个人网站、落地页或企业官网。
1335 157
|
5天前
|
编解码 Linux 数据安全/隐私保护
教程分享免费视频压缩软件,免费视频压缩,视频压缩免费,附压缩方法及学习教程
教程分享免费视频压缩软件,免费视频压缩,视频压缩免费,附压缩方法及学习教程
249 138
|
7天前
|
存储 安全 固态存储
四款WIN PE工具,都可以实现U盘安装教程
Windows PE是基于NT内核的轻量系统,用于系统安装、分区管理及故障修复。本文推荐多款PE制作工具,支持U盘启动,兼容UEFI/Legacy模式,具备备份还原、驱动识别等功能,操作简便,适合新旧电脑维护使用。
550 109