【调度算法】NSGA III(2)

简介: 【调度算法】NSGA III

【调度算法】NSGA III(1)https://developer.aliyun.com/article/1541023

代码

参考:https://github.com/Xavier-MaYiMing/NSGA-III

import numpy as np
import matplotlib.pyplot as plt
from collections import Counter
from itertools import combinations  # 创建和操作迭代器的工具
from scipy.linalg import LinAlgError    # 用于处理线性代数相关的错误
from scipy.spatial.distance import cdist    # 用于计算距离或相似性
def cal_obj(pop, nobj):
    '''
    :param pop: ndarray,决策变量的取值,形状为(N, D),N为种群中个体的数量,D为决策变量的数量
    :param nobj: int,目标函数的数量
    :return: ndarray,计算得到的目标函数值,形状为(N, nobj),N是种群中个体的数量,nobj是目标函数的数量。
    '''
    # 这里的函数是 DTLZ1 函数,用于多目标优化问题。g 是一个中间变量。
    g = 100 * (pop.shape[1] - nobj + 1 + np.sum(
        (pop[:, nobj - 1:] - 0.5) ** 2 - np.cos(20 * np.pi * (pop[:, nobj - 1:] - 0.5)), axis=1))
    objs = np.zeros((pop.shape[0], nobj))   # 创建一个大小为 (pop.shape[0], nobj) 的零矩阵,用于存储目标函数值。
    temp_pop = pop[:, : nobj - 1]   # 从输入的 pop 矩阵中取出前 nobj-1 列,存储在 temp_pop 中。
    for i in range(nobj):   # 遍历目标函数的数量(nobj)
        f = 0.5 * (1 + g)   # 计算 f,其中 g 是前面计算的中间变量。
        f *= np.prod(temp_pop[:, : temp_pop.shape[1] - i], axis=1)  # 使用累积乘法计算 f 的一部分,这部分与 temp_pop 相关。
        if i > 0:
            f *= 1 - temp_pop[:, temp_pop.shape[1] - i]     # 如果 i 大于 0,再乘以 1 减去 temp_pop 的一部分。
        objs[:, i] = f  # 将计算得到的 f 存储在目标函数矩阵的第 i 列。
    return objs
def factorial(n):
    # calculate n!
    if n == 0 or n == 1:
        return 1
    else:
        return n * factorial(n - 1)
def combination(n, m):
    # choose m elements from an n-length set
    # 计算排列组合数C(n,m)
    if m == 0 or m == n:
        return 1
    elif m > n:
        return 0
    else:
        return factorial(n) // (factorial(m) * factorial(n - m))
def reference_points(npop, nvar):
    '''
    calculate approximately npop uniformly distributed reference points on nvar dimensions
    :param npop: int,要生成的均匀分布的参考点的数量
    :param nvar: int,参考点的维度
    :return: ndarray,所生成的均匀分布的参考点的坐标,形状为(npop, nvar)
    '''
    h1 = 0  # 用于控制循环的计数器
    while combination(h1 + nvar, nvar - 1) <= npop: # 目的是确定一个足够大的h1,以便在nvar-1维度上有足够的均匀分布的参考点。
        h1 += 1
    # 使用组合数的计算结果,构建一个 points 数组,这个数组包含了 h1 维度中的参考点坐标
    points = np.array(list(combinations(np.arange(1, h1 + nvar), nvar - 1))) - np.arange(nvar - 1) - 1
    # 对 points 数组中的坐标进行变换,以确保它们在 [0, 1] 范围内均匀分布
    points = (np.concatenate((points, np.zeros((points.shape[0], 1)) + h1), axis=1) - np.concatenate((np.zeros((points.shape[0], 1)), points), axis=1)) / h1
    if h1 < nvar:   # 如果h1不足以在nvar-1维度上生成足够多的参考点
        h2 = 0
        # h2 的值,以便在nvar-1维度上有足够多的均匀分布的参考点。这些参考点将与之前的points组合在一起。
        while combination(h1 + nvar - 1, nvar - 1) + combination(h2 + nvar, nvar - 1) <= npop:
            h2 += 1
        if h2 > 0:
            # 使用上边类似的方式,构建temp_points数组,然后进行坐标变换
            temp_points = np.array(list(combinations(np.arange(1, h2 + nvar), nvar - 1))) - np.arange(nvar - 1) - 1
            temp_points = (np.concatenate((temp_points, np.zeros((temp_points.shape[0], 1)) + h2), axis=1) - np.concatenate((np.zeros((temp_points.shape[0], 1)), temp_points), axis=1)) / h2
            temp_points = temp_points / 2 + 1 / (2 * nvar)
            # 将temp_points添加到points中,以得到最终的参考点数组,并将其返回
            points = np.concatenate((points, temp_points), axis=0)
    return points
def nd_sort(objs):
    """
    fast non-domination sort
    :param objs: ndarray,种群中每个个体的目标函数值,形状为(npop, nobj),其中npop是种群中个体的数量,nobj是目标函数的数量。
    :return pfs: dict,键表示非支配级别,对应的值是一个包含相应级别的Pareto前沿中的个体索引的列表。
    :return rank: ndarray,每个个体的非支配级别,形状为 (npop,),其中npop是种群中个体的数量。rank数组指示了每个个体所属的非支配级别
    """
    (npop, nobj) = objs.shape
    n = np.zeros(npop, dtype=int)  # the number of individuals that dominate this individual
    s = []  # the index of individuals that dominated by this individual
    rank = np.zeros(npop, dtype=int)
    ind = 0
    pfs = {ind: []}  # Pareto fronts
    for i in range(npop):
        s.append([])
        for j in range(npop):
            if i != j:
                less = equal = more = 0
                for k in range(nobj):
                    if objs[i, k] < objs[j, k]:
                        less += 1
                    elif objs[i, k] == objs[j, k]:
                        equal += 1
                    else:
                        more += 1
                if less == 0 and equal != nobj:
                    n[i] += 1
                elif more == 0 and equal != nobj:
                    s[i].append(j)
        if n[i] == 0:
            pfs[ind].append(i)
            rank[i] = ind
    while pfs[ind]:
        pfs[ind + 1] = []
        for i in pfs[ind]:
            for j in s[i]:
                n[j] -= 1
                if n[j] == 0:
                    pfs[ind + 1].append(j)
                    rank[j] = ind + 1
        ind += 1
    pfs.pop(ind)
    return pfs, rank
def selection(pop, pc, rank, k=2):
    """
    binary tournament selection
    :param pop: ndarray,种群中每个个体的决策变量值,形状为(npop,nvar),即(种群中个体数量,决策变量数量)
    :param pc: float,选择概率
    :param rank: ndarray,每个个体的非支配级别,形状为(npop,),即(种群中个体数,)
    :param k: 锦标赛选择中参与竞争的个体数量
    :return: ndarray,选择后得到的用于繁殖的个体的决策变量值,形状为(nm,nvar),nm为选择个体数量,通常等于npop*pc,确保为偶数
    """
    (npop, nvar) = pop.shape
    nm = int(npop * pc)
    nm = nm if nm % 2 == 0 else nm + 1
    mating_pool = np.zeros((nm, nvar))
    for i in range(nm):
        [ind1, ind2] = np.random.choice(npop, k, replace=False)
        if rank[ind1] <= rank[ind2]:
            mating_pool[i] = pop[ind1]
        else:
            mating_pool[i] = pop[ind2]
    return mating_pool
def crossover(mating_pool, lb, ub, pc, eta_c):
    """
    simulated binary crossover (SBX) 模拟二进制交叉
    :param mating_pool: ndarray用于繁殖的个体的决策变量值,形状(noff,nvar),即(要繁殖的个体数,决策变量数)
    :param lb: ndarray,决策变量下界(lower bound),形状(nvar,)
    :param ub: ndarray,决策变量上界(upper bound),形状(nvar,)
    :param pc: float,交叉概率
    :param eta_c: int,扩散因子分布指数,用于控制模拟二进制交叉的分布情况,值越大(>10),分布越均匀
    :return: ndarray,交叉结果,形状为(noff, nvar)
    """
    (noff, nvar) = mating_pool.shape
    nm = int(noff / 2)
    parent1 = mating_pool[:nm]  #拆分
    parent2 = mating_pool[nm:]
    beta = np.zeros((nm, nvar))
    mu = np.random.random((nm, nvar))
    flag1 = mu <= 0.5
    flag2 = ~flag1
    beta[flag1] = (2 * mu[flag1]) ** (1 / (eta_c + 1))
    beta[flag2] = (2 - 2 * mu[flag2]) ** (-1 / (eta_c + 1))
    beta = beta * (-1) ** np.random.randint(0, 2, (nm, nvar))
    beta[np.random.random((nm, nvar)) < 0.5] = 1
    beta[np.tile(np.random.random((nm, 1)) > pc, (1, nvar))] = 1
    offspring1 = (parent1 + parent2) / 2 + beta * (parent1 - parent2) / 2   # 交叉
    offspring2 = (parent1 + parent2) / 2 - beta * (parent1 - parent2) / 2
    offspring = np.concatenate((offspring1, offspring2), axis=0)    # 重新拼接
    offspring = np.min((offspring, np.tile(ub, (noff, 1))), axis=0)
    offspring = np.max((offspring, np.tile(lb, (noff, 1))), axis=0)
    return offspring
def mutation(pop, lb, ub, pm, eta_m):
    """
    polynomial mutation 多项式变异
    :param pop: ndarray用于繁殖的个体的决策变量值,形状(noff,nvar),即(要繁殖的个体数,决策变量数)
    :param lb: ndarray,决策变量下界(lower bound),形状(nvar,)
    :param ub: ndarray,决策变量上界(upper bound),形状(nvar,)
    :param pm: float,变异概率
    :param eta_m: 扰动因子分布指数,用于控制多项式变异的分布形状,值很大时,变异幅度较小,变异的形状更趋于均匀分布
    :return: ndarray,交叉结果,形状为(noff, nvar)
    """
    (npop, nvar) = pop.shape
    lb = np.tile(lb, (npop, 1))
    ub = np.tile(ub, (npop, 1))
    site = np.random.random((npop, nvar)) < pm / nvar
    mu = np.random.random((npop, nvar))
    delta1 = (pop - lb) / (ub - lb)
    delta2 = (ub - pop) / (ub - lb)
    temp = np.logical_and(site, mu <= 0.5)
    pop[temp] += (ub[temp] - lb[temp]) * ((2 * mu[temp] + (1 - 2 * mu[temp]) * (1 - delta1[temp]) ** (eta_m + 1)) ** (1 / (eta_m + 1)) - 1)
    temp = np.logical_and(site, mu > 0.5)
    pop[temp] += (ub[temp] - lb[temp]) * (1 - (2 * (1 - mu[temp]) + 2 * (mu[temp] - 0.5) * (1 - delta2[temp]) ** (eta_m + 1)) ** (1 / (eta_m + 1)))
    pop = np.min((pop, ub), axis=0)
    pop = np.max((pop, lb), axis=0)
    return pop
def environmental_selection(pop, objs, zmin, npop, V):
    """
    NSGA-III environmental selection
    :param pop: ndarray,用于繁殖的个体的决策变量值,形状(noff,nvar),即(要繁殖的个体数,决策变量数)
    :param objs: ndarray,种群中每个个体的目标函数值,形状为 (npop, nobj),nobj 是目标函数的数量。
    :param zmin: ndarray,每个目标函数的最小值。它的形状为 (nobj,)
    :param npop: int,环境选择后要保留的个体数量
    :param V: 权向量,用于计算多目标优化中的 Pareto 前沿,形状通常是 (nv, nobj),即(权向量的数量,目标函数的数量)每个权向量代表一种目标函数权重的组合
    :return:
    """
    pfs, rank = nd_sort(objs)
    nobj = objs.shape[1]
    selected = np.full(pop.shape[0], False)
    ind = 0
    while np.sum(selected) + len(pfs[ind]) <= npop:
        selected[pfs[ind]] = True
        ind += 1
    K = npop - np.sum(selected)
    # select the remaining K solutions
    objs1 = objs[selected]
    objs2 = objs[pfs[ind]]
    npop1 = objs1.shape[0]
    npop2 = objs2.shape[0]
    nv = V.shape[0]
    temp_objs = np.concatenate((objs1, objs2), axis=0)
    t_objs = temp_objs - zmin
    # extreme points
    extreme = np.zeros(nobj)
    w = 1e-6 + np.eye(nobj)
    for i in range(nobj):
        extreme[i] = np.argmin(np.max(t_objs / w[i], axis=1))
    # intercepts
    try:
        hyperplane = np.matmul(np.linalg.inv(t_objs[extreme.astype(int)]), np.ones((nobj, 1)))
        if np.any(hyperplane == 0):
            a = np.max(t_objs, axis=0)
        else:
            a = 1 / hyperplane
    except LinAlgError:
        a = np.max(t_objs, axis=0)
    t_objs /= a.reshape(1, nobj)
    # association
    cosine = 1 - cdist(t_objs, V, 'cosine')
    distance = np.sqrt(np.sum(t_objs ** 2, axis=1).reshape(npop1 + npop2, 1)) * np.sqrt(1 - cosine ** 2)
    dis = np.min(distance, axis=1)
    association = np.argmin(distance, axis=1)
    temp_rho = dict(Counter(association[: npop1]))
    rho = np.zeros(nv)
    for key in temp_rho.keys():
        rho[key] = temp_rho[key]
    # selection
    choose = np.full(npop2, False)
    v_choose = np.full(nv, True)
    while np.sum(choose) < K:
        temp = np.where(v_choose)[0]
        jmin = np.where(rho[temp] == np.min(rho[temp]))[0]
        j = temp[np.random.choice(jmin)]
        I = np.where(np.bitwise_and(~choose, association[npop1:] == j))[0]
        if I.size > 0:
            if rho[j] == 0:
                s = np.argmin(dis[npop1 + I])
            else:
                s = np.random.randint(I.size)
            choose[I[s]] = True
            rho[j] += 1
        else:
            v_choose[j] = False
    selected[np.array(pfs[ind])[choose]] = True
    return pop[selected], objs[selected], rank[selected]
def main(npop, iter, lb, ub, nobj=3, pc=1, pm=1, eta_c=30, eta_m=20):
    """
    The main function
    :param npop: population size
    :param iter: iteration number
    :param lb: lower bound
    :param ub: upper bound
    :param nobj: the dimension of objective space
    :param pc: crossover probability (default = 1)
    :param pm: mutation probability (default = 1)
    :param eta_c: spread factor distribution index (default = 30) 扩散因子分布指数
    :param eta_m: perturbance factor distribution index (default = 20) 扰动因子分布指数
    :return:
    """
    # Step 1. Initialization
    nvar = len(lb)  # the dimension of decision space
    pop = np.random.uniform(lb, ub, (npop, nvar))  # population
    objs = cal_obj(pop, nobj)  # objectives
    V = reference_points(npop, nobj)  # reference vectors
    zmin = np.min(objs, axis=0)  # ideal points
    [pfs, rank] = nd_sort(objs)  # Pareto rank
    # Step 2. The main loop
    for t in range(iter):
        if (t + 1) % 50 == 0:
            print('Iteration: ' + str(t + 1) + ' completed.')
        # Step 2.1. Mating selection + crossover + mutation
        mating_pool = selection(pop, pc, rank)
        off = crossover(mating_pool, lb, ub, pc, eta_c)
        off = mutation(off, lb, ub, pm, eta_m)
        off_objs = cal_obj(off, nobj)
        # Step 2.2. Environmental selection
        zmin = np.min((zmin, np.min(off_objs, axis=0)), axis=0)
        pop, objs, rank = environmental_selection(np.concatenate((pop, off), axis=0), np.concatenate((objs, off_objs), axis=0), zmin, npop, V)
    # Step 3. Sort the results
    pf = objs[rank == 0]
    ax = plt.figure().add_subplot(111, projection='3d')
    ax.view_init(45, 45)
    x = [o[0] for o in pf]
    y = [o[1] for o in pf]
    z = [o[2] for o in pf]
    ax.scatter(x, y, z, color='red')
    ax.set_xlabel('objective 1')
    ax.set_ylabel('objective 2')
    ax.set_zlabel('objective 3')
    plt.title('The Pareto front of DTLZ1')
    plt.savefig('Pareto front')
    plt.show()
if __name__ == '__main__':
    main(91, 400, np.array([0] * 7), np.array([1] * 7))
目录
相关文章
|
1月前
|
算法 调度 Python
【调度算法】并行机调度问题遗传算法
【调度算法】并行机调度问题遗传算法
33 2
|
1月前
|
算法 测试技术 调度
【调度算法】DTLZ问题家族
【调度算法】DTLZ问题家族
20 1
|
1月前
|
算法 数据挖掘 调度
【调度算法】NSGA III(1)
【调度算法】NSGA III
76 0
|
9天前
|
机器学习/深度学习 算法 物联网
深入剖析操作系统调度算法
【7月更文挑战第13天】本文旨在探讨和比较不同的操作系统调度算法,并分析其对系统性能的影响。文章首先概述了调度算法的基本概念及其重要性,随后详细阐述了常见的调度算法类型,包括先来先服务、短作业优先、优先级调度、时间片轮转以及多级反馈队列等。通过对比不同算法的优缺点,文章进一步探讨了现代操作系统中调度算法的应用与挑战,以及如何根据实际需求选择合适的调度策略。最后,文章展望了操作系统调度算法的未来发展方向,特别是在云计算和物联网时代下的适应性与创新。
16 1
|
23天前
|
算法 调度 云计算
操作系统中的调度算法:从理论到实践
在计算机科学领域,操作系统的调度算法是决定任务执行顺序的关键。本文首先概述了调度算法的基本概念和重要性,随后深入探讨了几种主要的调度算法,包括先来先服务、短作业优先、轮转与优先级调度等。通过引用最新的科研数据和实验证据,文章揭示了不同调度算法的性能表现和适用场景。此外,本文还讨论了现代操作系统中调度算法面临的挑战和未来的发展方向,强调了在多核处理器和云计算环境下调度策略的复杂性。最后,通过案例分析,展示了如何在实际系统中应用这些理论知识,以及在设计高效调度系统时需要考虑的因素。
|
1月前
|
算法 调度
基于变异混合蛙跳算法的车间调度最优化matlab仿真,可以任意调整工件数和机器数,输出甘特图
**摘要:** 实现变异混合蛙跳算法的MATLAB2022a版车间调度优化程序,支持动态调整工件和机器数,输出甘特图。核心算法结合SFLA与变异策略,解决Job-Shop Scheduling Problem,最小化总完成时间。SFLA模拟蛙群行为,分组进行局部搜索和全局信息交换。变异策略增强全局探索,避免局部最优。程序初始化随机解,按规则更新,经多次迭代和信息交换后终止。
|
1月前
|
算法 调度
【调度算法】Boltzmann选择
【调度算法】Boltzmann选择
45 1
|
1月前
|
算法 调度 Python
【调度算法】开放车间调度问题遗传算法
【调度算法】开放车间调度问题遗传算法
17 1
|
1月前
|
算法 调度
【调度算法】NSGA II
【调度算法】NSGA II
34 1
|
18天前
|
机器学习/深度学习 算法 数据挖掘
操作系统调度算法的演进与性能分析
随着计算机科学的发展,操作系统作为硬件与软件之间的桥梁,其调度算法对系统性能有着举足轻重的影响。本文将探讨操作系统中调度算法的演变,从早期的简单调度策略到现代复杂的多级反馈队列和实时调度机制,并结合最新研究和实验数据,深入分析不同调度算法对系统吞吐量、响应时间及资源利用率的影响。通过对调度算法性能的定量评估,本文旨在为系统设计者提供优化决策的理论依据,同时为未来调度算法的研究指明方向。
15 0