相对平均分布

简介:

有一个项目用来负责调度集群中的"cron任务",比如一个application中可以配置N个定时任务,这些任务信息最终注册到zookeeper上,并开发了一系列代码用于维护这些任务的"活性";当applicaton中一个server故障,那么这个server上接管的任务,需要迁移到其他server上,如果多个server存活的话,还需要这些任务能够"均衡"的分布.
其中"负载均衡",很好理解,比如有6个任务,3个server,那么就需要每个server上尽可能的运行2个任务;其实这个事情想起来很简单,但是做起来似乎有些不得不考虑的问题:

1) "相对平均"怎么设计
2) 迁移任务时,是否会丢失任务的触发时机;比如一个任务凌晨3点执行,刚好此时运行了一次"均衡",任务在原来的server上没有触发,在新的server上又过了时间..
3) 迁移任务时,还需要考虑"最少移动"次数,不能大面积迁移任务;只能从"负载高"的server上迁移到"负载低"的.

例如:

sid1: w1 w2 w3 w4
sid2: w5
sid3:w6

期望迁移之后:

sid1:w1 w2
sid2:w5 w3
sid3:w4 w6

而不是(这种结果迁移的面积太大,只需要把"多余"的任务迁移出去即可,而不是重新洗牌再均衡)

sid1:w6 w5
sid2:w2 w3
sid3:w1 w4

经过提取,"相对平均"的设计代码如下,仅作备忘:
Java代码

package com.test.demo.zookeeper;  
  
import java.io.BufferedReader;  
import java.io.InputStreamReader;  
import java.util.*;  
  
public class WorkersBalanceMain {  
    private List<String> servers = new ArrayList<String>();  
    private Map<String, List<String>> current = new HashMap<String, List<String>>();  
    private Set<String> workers = new HashSet<String>();  
  
    public static void main(String[] args) {  
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));  
        String line;  
        Set<String> servers = new HashSet<String>();  
        WorkersBalanceMain balancer = new WorkersBalanceMain();  
        try {  
            while ((line = br.readLine()) != null) {  
                if (line.startsWith("addWorker")) {  
                    balancer.addWorkers(line);  
                } else if (line.startsWith("addServer")) {  
                    balancer.addServers(line);  
                } else {  
                    System.out.println("???");  
                    continue;  
                }  
                balancer.rebalance();  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
        System.out.println("--END---");  
    }  
  
    public void addServers(String source) {  
        int index = source.indexOf(" ");  
        if (index == -1) {  
            return;  
        }  
        String[] values = source.substring(index + 1).split(" ");  
        if (values == null || values.length == 0) {  
            return;  
        }  
        for (String server : values) {  
            servers.add(server);  
            if(current.get(server) == null){  
                current.put(server,new ArrayList<String>());  
            }  
        }  
    }  
  
    public void addWorkers(String source) {  
        int index = source.indexOf(" ");  
        if (index == -1) {  
            return;  
        }  
        String[] values = source.substring(index + 1).split(" ");  
        if (values == null || values.length == 0) {  
            return;  
        }  
        //当有新的worker提交时,将咱有一台机器接管  
        String sid = servers.get(0);  
        List<String> sw = current.get(sid);  
        if(sw == null){  
            current.put(sid,new ArrayList<String>());  
        }  
        for (String worker : values) {  
            workers.add(worker);  
            sw.add(worker);  
        }  
  
    }  
  
    public void rebalance() {  
        try {  
            if (workers.isEmpty()) {  
                return;  
            }  
            for (String sid : servers) {  
                if (current.get(sid) == null) {  
                    current.put(sid, new ArrayList<String>());  
                }  
            }  
            //根据每个sid上的worker个数,整理成一个排序的map  
            TreeMap<Integer, List<String>> counterMap = new TreeMap<Integer, List<String>>();  
            for (Map.Entry<String, List<String>> entry : current.entrySet()) {  
                int total = entry.getValue().size();  
                List<String> sl = counterMap.get(total);  
                if (sl == null) {  
                    sl = new ArrayList<String>();  
                    counterMap.put(total, sl);  
                }  
                sl.add(entry.getKey());//sid  
            }  
            int totalWorkers = workers.size();  
            int totalServers = current.keySet().size();  
            int avg = totalWorkers / totalServers;//每个server实例可以接管任务的平均数  
            while (true) {  
                Map.Entry<Integer, List<String>> gt = counterMap.higherEntry(avg);  //大于平均数的列表, >= avg + 1  
                Map.Entry<Integer, List<String>> lt = counterMap.lowerEntry(avg); //与平均数差值为2的 <= arg  - 1  
                //允许任务个数与avg上线浮动1各个,不是绝对的平均  
  
                if (gt == null || lt == null) {  
                    break;  
                }  
                Integer gtKey = gt.getKey();  
                Integer ltKey = lt.getKey();  
                if (gtKey - ltKey < 2) {  
                    break;  
                }  
                if (gt.getValue().size() == 0) {  
                    counterMap.remove(gt.getKey());  
                }  
                if (lt.getValue().size() == 0) {  
                    counterMap.remove(lt.getKey());  
                }  
                Iterator<String> it = gt.getValue().iterator(); //sid列表  
                while (it.hasNext()) {  
                    String _fromSid = it.next();  
                    List<String> _currentWorkers = current.get(_fromSid);  
                    if (_currentWorkers == null || _currentWorkers.isEmpty()) {  
                        it.remove();  
                        current.remove(_fromSid);  
                        continue;  
                    }  
                    List<String> _ltServers = lt.getValue();  
                    if (_ltServers.isEmpty()) {  
                        counterMap.remove(ltKey);  
                        break;  
                    }  
                    //取出需要交换出去的任务id  
                    int _currentWorkersSize = _currentWorkers.size();  
                    String _wid = _currentWorkers.get(_currentWorkersSize - 1);  
                    String _toSid = _ltServers.get(0);  
                    //从_fromSid的worker列表中移除低workerId  
                    //注意:移除最后一个,和_ltWorkers.add(_wid)对应,_ltWorkers将新任务添加到list的尾部  
                    //即从尾部移除,从尾部添加,基本保证"原任务,最少迁移次数"  
                    _currentWorkers.remove(_currentWorkersSize - 1);  
                    it.remove();  
                    _ltServers.remove(0);  
                    //将此workerId添加到_toSid的worker列表中  
                    List<String> _ltWorkers = current.get(_toSid);  
                    if (_ltWorkers == null) {  
                        _ltWorkers = new ArrayList<String>();  
                        current.put(_toSid, _ltWorkers);  
                    }  
                    _ltWorkers.add(_wid);  
                    //将gt的key降低一个数字  
                    List<String> _next = counterMap.get(gtKey - 1);  
                    if (_next == null) {  
                        _next = new ArrayList<String>();  
                        counterMap.put(gtKey - 1, _next);  
                    }  
                    _next.add(_fromSid);  
                    //将lt的key提升一个数字  
                    List<String> _prev = counterMap.get(ltKey + 1);  
                    //从lt的countMap中移除,因为它将被放置在key + 1的新位置  
                    Iterator<String> _ltIt = _ltServers.iterator();  
                    while (_ltIt.hasNext()) {  
                        if (_ltIt.next().equalsIgnoreCase(_toSid)) {  
                            _ltIt.remove();  
                            break;  
                        }  
                    }  
                    if (_prev == null) {  
                        _prev = new ArrayList<String>();  
                        counterMap.put(ltKey + 1, _prev);  
                    }  
                    _prev.add(_toSid);  
                }  
            }  
            //dump info  
            for (Map.Entry<String, List<String>> entry : current.entrySet()) {  
                System.out.println("Sid:" + entry.getKey());  
                System.out.println(entry.getValue().toString());  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
  
    }  
  
}

原文链接:[http://wely.iteye.com/blog/2228792]

相关文章
|
Python
Python 金融量化 均线系统交易策略专题(简单移动平均,加权移动平均,指数加权移动平均,异同移动平均MACD等解读与绘图)
Python 金融量化 均线系统交易策略专题(简单移动平均,加权移动平均,指数加权移动平均,异同移动平均MACD等解读与绘图)
1187 0
Python 金融量化 均线系统交易策略专题(简单移动平均,加权移动平均,指数加权移动平均,异同移动平均MACD等解读与绘图)
|
9月前
|
机器学习/深度学习 监控 数据挖掘
数据并非都是正态分布:三种常见的统计分布及其应用
这篇文章除了介绍线性模型在减肥app预测中的不切实际性,还探讨了不同统计分布在体重管理和数据分析中的应用。文章提到了正态分布和泊松分布,前者常用于描述围绕平均值对称分布的连续数据,如体重;后者适合计数数据,如体重变化次数。正态分布以其钟形曲线闻名,泊松分布则描述独立事件的数量。文章还简要介绍了卡方分布在检验分类变量关系时的作用。最后,文章指出了在线性回归中假设数据正态分布的原因,包括便于统计推断和最小化估计误差。
786 5
【数理统计实验(一)】统计量近似分布的随机模拟
【数理统计实验(一)】统计量近似分布的随机模拟
|
10月前
|
数据可视化 C语言
使用R语言随机波动模型SV处理时间序列中的随机波动率
使用R语言随机波动模型SV处理时间序列中的随机波动率
|
10月前
极值分析:分块极大值BLOCK-MAXIMA、阈值超额法、广义帕累托分布GPD拟合降雨数据时间序列
极值分析:分块极大值BLOCK-MAXIMA、阈值超额法、广义帕累托分布GPD拟合降雨数据时间序列
极值分析:分块极大值BLOCK-MAXIMA、阈值超额法、广义帕累托分布GPD拟合降雨数据时间序列
|
10月前
919: 我们被平均了
919: 我们被平均了
|
10月前
R语言量化:合成波动率指数移动平均策略分析标准普尔500波动率指数(VIX)
R语言量化:合成波动率指数移动平均策略分析标准普尔500波动率指数(VIX)
三大抽样分布——卡方分布、t分布、F分布
三大抽样分布——卡方分布、t分布、F分布
HIMA 984865165 反映了分数带宽与频率增加的对数
HIMA 984865165 反映了分数带宽与频率增加的对数
HIMA 984865165 反映了分数带宽与频率增加的对数

热门文章

最新文章