【调度算法】NSGA III(2)

简介: 【调度算法】NSGA III

【调度算法】NSGA III(1)https://developer.aliyun.com/article/1541023

代码

参考:https://github.com/Xavier-MaYiMing/NSGA-III

import numpy as np
import matplotlib.pyplot as plt
from collections import Counter
from itertools import combinations  # 创建和操作迭代器的工具
from scipy.linalg import LinAlgError    # 用于处理线性代数相关的错误
from scipy.spatial.distance import cdist    # 用于计算距离或相似性
def cal_obj(pop, nobj):
    '''
    :param pop: ndarray,决策变量的取值,形状为(N, D),N为种群中个体的数量,D为决策变量的数量
    :param nobj: int,目标函数的数量
    :return: ndarray,计算得到的目标函数值,形状为(N, nobj),N是种群中个体的数量,nobj是目标函数的数量。
    '''
    # 这里的函数是 DTLZ1 函数,用于多目标优化问题。g 是一个中间变量。
    g = 100 * (pop.shape[1] - nobj + 1 + np.sum(
        (pop[:, nobj - 1:] - 0.5) ** 2 - np.cos(20 * np.pi * (pop[:, nobj - 1:] - 0.5)), axis=1))
    objs = np.zeros((pop.shape[0], nobj))   # 创建一个大小为 (pop.shape[0], nobj) 的零矩阵,用于存储目标函数值。
    temp_pop = pop[:, : nobj - 1]   # 从输入的 pop 矩阵中取出前 nobj-1 列,存储在 temp_pop 中。
    for i in range(nobj):   # 遍历目标函数的数量(nobj)
        f = 0.5 * (1 + g)   # 计算 f,其中 g 是前面计算的中间变量。
        f *= np.prod(temp_pop[:, : temp_pop.shape[1] - i], axis=1)  # 使用累积乘法计算 f 的一部分,这部分与 temp_pop 相关。
        if i > 0:
            f *= 1 - temp_pop[:, temp_pop.shape[1] - i]     # 如果 i 大于 0,再乘以 1 减去 temp_pop 的一部分。
        objs[:, i] = f  # 将计算得到的 f 存储在目标函数矩阵的第 i 列。
    return objs
def factorial(n):
    # calculate n!
    if n == 0 or n == 1:
        return 1
    else:
        return n * factorial(n - 1)
def combination(n, m):
    # choose m elements from an n-length set
    # 计算排列组合数C(n,m)
    if m == 0 or m == n:
        return 1
    elif m > n:
        return 0
    else:
        return factorial(n) // (factorial(m) * factorial(n - m))
def reference_points(npop, nvar):
    '''
    calculate approximately npop uniformly distributed reference points on nvar dimensions
    :param npop: int,要生成的均匀分布的参考点的数量
    :param nvar: int,参考点的维度
    :return: ndarray,所生成的均匀分布的参考点的坐标,形状为(npop, nvar)
    '''
    h1 = 0  # 用于控制循环的计数器
    while combination(h1 + nvar, nvar - 1) <= npop: # 目的是确定一个足够大的h1,以便在nvar-1维度上有足够的均匀分布的参考点。
        h1 += 1
    # 使用组合数的计算结果,构建一个 points 数组,这个数组包含了 h1 维度中的参考点坐标
    points = np.array(list(combinations(np.arange(1, h1 + nvar), nvar - 1))) - np.arange(nvar - 1) - 1
    # 对 points 数组中的坐标进行变换,以确保它们在 [0, 1] 范围内均匀分布
    points = (np.concatenate((points, np.zeros((points.shape[0], 1)) + h1), axis=1) - np.concatenate((np.zeros((points.shape[0], 1)), points), axis=1)) / h1
    if h1 < nvar:   # 如果h1不足以在nvar-1维度上生成足够多的参考点
        h2 = 0
        # h2 的值,以便在nvar-1维度上有足够多的均匀分布的参考点。这些参考点将与之前的points组合在一起。
        while combination(h1 + nvar - 1, nvar - 1) + combination(h2 + nvar, nvar - 1) <= npop:
            h2 += 1
        if h2 > 0:
            # 使用上边类似的方式,构建temp_points数组,然后进行坐标变换
            temp_points = np.array(list(combinations(np.arange(1, h2 + nvar), nvar - 1))) - np.arange(nvar - 1) - 1
            temp_points = (np.concatenate((temp_points, np.zeros((temp_points.shape[0], 1)) + h2), axis=1) - np.concatenate((np.zeros((temp_points.shape[0], 1)), temp_points), axis=1)) / h2
            temp_points = temp_points / 2 + 1 / (2 * nvar)
            # 将temp_points添加到points中,以得到最终的参考点数组,并将其返回
            points = np.concatenate((points, temp_points), axis=0)
    return points
def nd_sort(objs):
    """
    fast non-domination sort
    :param objs: ndarray,种群中每个个体的目标函数值,形状为(npop, nobj),其中npop是种群中个体的数量,nobj是目标函数的数量。
    :return pfs: dict,键表示非支配级别,对应的值是一个包含相应级别的Pareto前沿中的个体索引的列表。
    :return rank: ndarray,每个个体的非支配级别,形状为 (npop,),其中npop是种群中个体的数量。rank数组指示了每个个体所属的非支配级别
    """
    (npop, nobj) = objs.shape
    n = np.zeros(npop, dtype=int)  # the number of individuals that dominate this individual
    s = []  # the index of individuals that dominated by this individual
    rank = np.zeros(npop, dtype=int)
    ind = 0
    pfs = {ind: []}  # Pareto fronts
    for i in range(npop):
        s.append([])
        for j in range(npop):
            if i != j:
                less = equal = more = 0
                for k in range(nobj):
                    if objs[i, k] < objs[j, k]:
                        less += 1
                    elif objs[i, k] == objs[j, k]:
                        equal += 1
                    else:
                        more += 1
                if less == 0 and equal != nobj:
                    n[i] += 1
                elif more == 0 and equal != nobj:
                    s[i].append(j)
        if n[i] == 0:
            pfs[ind].append(i)
            rank[i] = ind
    while pfs[ind]:
        pfs[ind + 1] = []
        for i in pfs[ind]:
            for j in s[i]:
                n[j] -= 1
                if n[j] == 0:
                    pfs[ind + 1].append(j)
                    rank[j] = ind + 1
        ind += 1
    pfs.pop(ind)
    return pfs, rank
def selection(pop, pc, rank, k=2):
    """
    binary tournament selection
    :param pop: ndarray,种群中每个个体的决策变量值,形状为(npop,nvar),即(种群中个体数量,决策变量数量)
    :param pc: float,选择概率
    :param rank: ndarray,每个个体的非支配级别,形状为(npop,),即(种群中个体数,)
    :param k: 锦标赛选择中参与竞争的个体数量
    :return: ndarray,选择后得到的用于繁殖的个体的决策变量值,形状为(nm,nvar),nm为选择个体数量,通常等于npop*pc,确保为偶数
    """
    (npop, nvar) = pop.shape
    nm = int(npop * pc)
    nm = nm if nm % 2 == 0 else nm + 1
    mating_pool = np.zeros((nm, nvar))
    for i in range(nm):
        [ind1, ind2] = np.random.choice(npop, k, replace=False)
        if rank[ind1] <= rank[ind2]:
            mating_pool[i] = pop[ind1]
        else:
            mating_pool[i] = pop[ind2]
    return mating_pool
def crossover(mating_pool, lb, ub, pc, eta_c):
    """
    simulated binary crossover (SBX) 模拟二进制交叉
    :param mating_pool: ndarray用于繁殖的个体的决策变量值,形状(noff,nvar),即(要繁殖的个体数,决策变量数)
    :param lb: ndarray,决策变量下界(lower bound),形状(nvar,)
    :param ub: ndarray,决策变量上界(upper bound),形状(nvar,)
    :param pc: float,交叉概率
    :param eta_c: int,扩散因子分布指数,用于控制模拟二进制交叉的分布情况,值越大(>10),分布越均匀
    :return: ndarray,交叉结果,形状为(noff, nvar)
    """
    (noff, nvar) = mating_pool.shape
    nm = int(noff / 2)
    parent1 = mating_pool[:nm]  #拆分
    parent2 = mating_pool[nm:]
    beta = np.zeros((nm, nvar))
    mu = np.random.random((nm, nvar))
    flag1 = mu <= 0.5
    flag2 = ~flag1
    beta[flag1] = (2 * mu[flag1]) ** (1 / (eta_c + 1))
    beta[flag2] = (2 - 2 * mu[flag2]) ** (-1 / (eta_c + 1))
    beta = beta * (-1) ** np.random.randint(0, 2, (nm, nvar))
    beta[np.random.random((nm, nvar)) < 0.5] = 1
    beta[np.tile(np.random.random((nm, 1)) > pc, (1, nvar))] = 1
    offspring1 = (parent1 + parent2) / 2 + beta * (parent1 - parent2) / 2   # 交叉
    offspring2 = (parent1 + parent2) / 2 - beta * (parent1 - parent2) / 2
    offspring = np.concatenate((offspring1, offspring2), axis=0)    # 重新拼接
    offspring = np.min((offspring, np.tile(ub, (noff, 1))), axis=0)
    offspring = np.max((offspring, np.tile(lb, (noff, 1))), axis=0)
    return offspring
def mutation(pop, lb, ub, pm, eta_m):
    """
    polynomial mutation 多项式变异
    :param pop: ndarray用于繁殖的个体的决策变量值,形状(noff,nvar),即(要繁殖的个体数,决策变量数)
    :param lb: ndarray,决策变量下界(lower bound),形状(nvar,)
    :param ub: ndarray,决策变量上界(upper bound),形状(nvar,)
    :param pm: float,变异概率
    :param eta_m: 扰动因子分布指数,用于控制多项式变异的分布形状,值很大时,变异幅度较小,变异的形状更趋于均匀分布
    :return: ndarray,交叉结果,形状为(noff, nvar)
    """
    (npop, nvar) = pop.shape
    lb = np.tile(lb, (npop, 1))
    ub = np.tile(ub, (npop, 1))
    site = np.random.random((npop, nvar)) < pm / nvar
    mu = np.random.random((npop, nvar))
    delta1 = (pop - lb) / (ub - lb)
    delta2 = (ub - pop) / (ub - lb)
    temp = np.logical_and(site, mu <= 0.5)
    pop[temp] += (ub[temp] - lb[temp]) * ((2 * mu[temp] + (1 - 2 * mu[temp]) * (1 - delta1[temp]) ** (eta_m + 1)) ** (1 / (eta_m + 1)) - 1)
    temp = np.logical_and(site, mu > 0.5)
    pop[temp] += (ub[temp] - lb[temp]) * (1 - (2 * (1 - mu[temp]) + 2 * (mu[temp] - 0.5) * (1 - delta2[temp]) ** (eta_m + 1)) ** (1 / (eta_m + 1)))
    pop = np.min((pop, ub), axis=0)
    pop = np.max((pop, lb), axis=0)
    return pop
def environmental_selection(pop, objs, zmin, npop, V):
    """
    NSGA-III environmental selection
    :param pop: ndarray,用于繁殖的个体的决策变量值,形状(noff,nvar),即(要繁殖的个体数,决策变量数)
    :param objs: ndarray,种群中每个个体的目标函数值,形状为 (npop, nobj),nobj 是目标函数的数量。
    :param zmin: ndarray,每个目标函数的最小值。它的形状为 (nobj,)
    :param npop: int,环境选择后要保留的个体数量
    :param V: 权向量,用于计算多目标优化中的 Pareto 前沿,形状通常是 (nv, nobj),即(权向量的数量,目标函数的数量)每个权向量代表一种目标函数权重的组合
    :return:
    """
    pfs, rank = nd_sort(objs)
    nobj = objs.shape[1]
    selected = np.full(pop.shape[0], False)
    ind = 0
    while np.sum(selected) + len(pfs[ind]) <= npop:
        selected[pfs[ind]] = True
        ind += 1
    K = npop - np.sum(selected)
    # select the remaining K solutions
    objs1 = objs[selected]
    objs2 = objs[pfs[ind]]
    npop1 = objs1.shape[0]
    npop2 = objs2.shape[0]
    nv = V.shape[0]
    temp_objs = np.concatenate((objs1, objs2), axis=0)
    t_objs = temp_objs - zmin
    # extreme points
    extreme = np.zeros(nobj)
    w = 1e-6 + np.eye(nobj)
    for i in range(nobj):
        extreme[i] = np.argmin(np.max(t_objs / w[i], axis=1))
    # intercepts
    try:
        hyperplane = np.matmul(np.linalg.inv(t_objs[extreme.astype(int)]), np.ones((nobj, 1)))
        if np.any(hyperplane == 0):
            a = np.max(t_objs, axis=0)
        else:
            a = 1 / hyperplane
    except LinAlgError:
        a = np.max(t_objs, axis=0)
    t_objs /= a.reshape(1, nobj)
    # association
    cosine = 1 - cdist(t_objs, V, 'cosine')
    distance = np.sqrt(np.sum(t_objs ** 2, axis=1).reshape(npop1 + npop2, 1)) * np.sqrt(1 - cosine ** 2)
    dis = np.min(distance, axis=1)
    association = np.argmin(distance, axis=1)
    temp_rho = dict(Counter(association[: npop1]))
    rho = np.zeros(nv)
    for key in temp_rho.keys():
        rho[key] = temp_rho[key]
    # selection
    choose = np.full(npop2, False)
    v_choose = np.full(nv, True)
    while np.sum(choose) < K:
        temp = np.where(v_choose)[0]
        jmin = np.where(rho[temp] == np.min(rho[temp]))[0]
        j = temp[np.random.choice(jmin)]
        I = np.where(np.bitwise_and(~choose, association[npop1:] == j))[0]
        if I.size > 0:
            if rho[j] == 0:
                s = np.argmin(dis[npop1 + I])
            else:
                s = np.random.randint(I.size)
            choose[I[s]] = True
            rho[j] += 1
        else:
            v_choose[j] = False
    selected[np.array(pfs[ind])[choose]] = True
    return pop[selected], objs[selected], rank[selected]
def main(npop, iter, lb, ub, nobj=3, pc=1, pm=1, eta_c=30, eta_m=20):
    """
    The main function
    :param npop: population size
    :param iter: iteration number
    :param lb: lower bound
    :param ub: upper bound
    :param nobj: the dimension of objective space
    :param pc: crossover probability (default = 1)
    :param pm: mutation probability (default = 1)
    :param eta_c: spread factor distribution index (default = 30) 扩散因子分布指数
    :param eta_m: perturbance factor distribution index (default = 20) 扰动因子分布指数
    :return:
    """
    # Step 1. Initialization
    nvar = len(lb)  # the dimension of decision space
    pop = np.random.uniform(lb, ub, (npop, nvar))  # population
    objs = cal_obj(pop, nobj)  # objectives
    V = reference_points(npop, nobj)  # reference vectors
    zmin = np.min(objs, axis=0)  # ideal points
    [pfs, rank] = nd_sort(objs)  # Pareto rank
    # Step 2. The main loop
    for t in range(iter):
        if (t + 1) % 50 == 0:
            print('Iteration: ' + str(t + 1) + ' completed.')
        # Step 2.1. Mating selection + crossover + mutation
        mating_pool = selection(pop, pc, rank)
        off = crossover(mating_pool, lb, ub, pc, eta_c)
        off = mutation(off, lb, ub, pm, eta_m)
        off_objs = cal_obj(off, nobj)
        # Step 2.2. Environmental selection
        zmin = np.min((zmin, np.min(off_objs, axis=0)), axis=0)
        pop, objs, rank = environmental_selection(np.concatenate((pop, off), axis=0), np.concatenate((objs, off_objs), axis=0), zmin, npop, V)
    # Step 3. Sort the results
    pf = objs[rank == 0]
    ax = plt.figure().add_subplot(111, projection='3d')
    ax.view_init(45, 45)
    x = [o[0] for o in pf]
    y = [o[1] for o in pf]
    z = [o[2] for o in pf]
    ax.scatter(x, y, z, color='red')
    ax.set_xlabel('objective 1')
    ax.set_ylabel('objective 2')
    ax.set_zlabel('objective 3')
    plt.title('The Pareto front of DTLZ1')
    plt.savefig('Pareto front')
    plt.show()
if __name__ == '__main__':
    main(91, 400, np.array([0] * 7), np.array([1] * 7))
目录
相关文章
|
1天前
|
存储 算法 前端开发
深入理解操作系统:进程调度与优先级队列算法
【9月更文挑战第25天】在操作系统的复杂世界中,进程调度是维持系统稳定运行的核心机制之一。本文将深入探讨进程调度的基本概念,分析不同的进程调度算法,并着重介绍优先级队列算法的原理和实现。通过简洁明了的语言,我们将一起探索如何优化进程调度,提高操作系统的效率和响应速度。无论你是计算机科学的初学者还是希望深化理解的专业人士,这篇文章都将为你提供有价值的见解。
|
2天前
|
机器学习/深度学习 算法 物联网
探究操作系统的心脏:调度算法的演变与优化
本文旨在深入探讨操作系统中核心组件——调度算法的发展脉络与优化策略。通过分析从单任务到多任务、实时系统的演进过程,揭示调度算法如何作为系统性能瓶颈的解决关键,以及在云计算和物联网新兴领域中的应用前景。不同于传统摘要,本文将注重于概念阐释与实例分析相结合,为读者提供直观且全面的理解视角。
|
15天前
|
算法 人机交互 调度
进程调度算法_轮转调度算法_优先级调度算法_多级反馈队列调度算法
轮转调度算法(RR)是一种常用且简单的调度方法,通过给每个进程分配一小段CPU运行时间来轮流执行。进程切换发生在当前进程完成或时间片用尽时。优先级调度算法则根据进程的紧迫性赋予不同优先级,高优先级进程优先执行,并分为抢占式和非抢占式。多队列调度算法通过设置多个具有不同优先级的就绪队列,采用多级反馈队列优先调度机制,以满足不同类型用户的需求,从而优化整体调度性能。
31 15
|
4天前
|
算法 调度 UED
深入理解操作系统的调度算法
【9月更文挑战第22天】本文通过深入浅出的方式,介绍了操作系统中的核心概念——调度算法。文章首先解释了调度算法的基本定义和重要性,然后详细分析了先来先服务(FCFS)、短作业优先(SJF)以及时间片轮转(RR)三种常见的调度算法。每种算法都配有简单的代码示例,帮助读者更好地理解其工作原理。最后,文章探讨了这些调度算法在现代操作系统中的应用及其优缺点,旨在为读者提供对操作系统调度机制的全面认识。
|
15天前
|
算法 调度
作业调度算法_先来先服务算法_短作业优先算法_高响应比优先算法
本文介绍了作业调度算法,包括先来先服务(FCFS)、短进程优先(SJF)和高响应比优先(HRRN)算法。通过分析进程的到达时间和所需CPU服务时间,计算进程的开始时间、完成时间、平均周转时间和平均带权周转时间,以评估不同算法的性能。FCFS适合长作业,SJF适合短作业,而HRRN则综合了两者的优点。
44 12
|
17天前
|
算法 调度 UED
深入理解操作系统之进程调度算法
【9月更文挑战第9天】在操作系统的心脏跳动中,进程调度扮演着关键角色,就如同指挥家控制交响乐的节奏。本文将通过浅显易懂的语言和生动的比喻,带领读者走进进程调度的世界,探索不同调度算法背后的哲学与实践,以及它们如何影响系统的性能和用户体验。从最简单的先来先服务到复杂的多级队列和反馈循环,我们将一同见证操作系统如何在众多任务中做出选择,确保系统的高效与公平。
|
10天前
|
算法 Linux 调度
探索现代操作系统的心脏:调度算法的演变与挑战
本文旨在深入探讨现代操作系统中至关重要的组成部分——进程调度算法。通过回顾其发展历程,分析当前主流技术,并展望未来趋势,揭示调度算法如何影响系统性能和用户体验。不同于常规摘要,本文将注重于技术的深度解析和背后的设计哲学,为专业开发者提供全面的视角。
22 0
|
10天前
|
人工智能 算法 物联网
探究操作系统的心脏:调度算法的进化与影响
本文深入探讨了操作系统中核心组件—调度算法的发展历程,重点分析了先来先服务、短作业优先、时间片轮转、优先级调度及多级反馈队列等经典调度算法。通过对比各算法的性能特点,如公平性、响应速度和系统吞吐量,阐述了它们在不同应用场景下的适用性和局限性。同时,文章展望了未来调度算法可能的改进方向,包括人工智能驱动的自学习调度策略、云计算环境下的分布式调度优化,以及物联网设备资源限制下的轻量级调度方案。此外,还强调了实时系统对高可靠性和严格时序保证的需求,以及在多核处理器普及背景下,线程级并行化对调度机制提出的新挑战。本文旨在为操作系统设计者、性能优化工程师及计算机科学领域的研究者和学生提供一个全面而深入的
25 0
|
1月前
|
DataWorks 算法 调度
B端算法实践问题之配置脚本以支持blink批处理作业的调度如何解决
B端算法实践问题之配置脚本以支持blink批处理作业的调度如何解决
27 1
|
27天前
|
存储 算法 调度
深入理解操作系统:进程调度的算法与实现
【8月更文挑战第31天】在操作系统的核心,进程调度扮演着关键角色,它决定了哪个进程将获得CPU的使用权。本文不仅剖析了进程调度的重要性和基本概念,还通过实际代码示例,展示了如何实现一个简单的调度算法。我们将从理论到实践,一步步构建起对进程调度的理解,让读者能够把握操作系统中这一复杂而精妙的部分。