强化深度学习中使用Dyna-Q算法和优先遍历算法在机器人实战中的对比分析(超详细 附源码)

简介: 强化深度学习中使用Dyna-Q算法和优先遍历算法在机器人实战中的对比分析(超详细 附源码)

需要源码和环境搭建请点赞关注收藏后评论区留言并且私信~~~

一、优先遍历

在随机环境中,值函数变化的大小以及状态-动作对更新的优先级都受迁移概率估计值的影响,可以根据紧急程度对其更新顺序进行优先级排序,这就是优先遍历。

优先遍历是一种常用的提高规划效率的分布计算方法,在一定程度上,该方法可以避免随机选择状态和动作所导致的低效率问题,在使用优先遍历法时,用一个优先队列PQueue来存储值函数变化较大的状态-动作对,并以动作值函数改变的大小作为优先级P来对其进行排序,然后依据该优先队列依次产生模拟经验

算法流程图如下

二、在扫地机器人中实战

在扫地机器人环境中,分别用n=10,n=30的Dyna-Q算法和优先遍历算法进行训练,将到达垃圾状态和充电状态的迁移奖赏均设置为+1,其他情况迁移奖赏均设置为-0.1,参数为0.9,但是学习率为0.1.

实验结果表明:在除了到达垃圾和充电状态之外,其他奖赏都为负值的情况下,优先遍历算法运行效果好于Dyna-Q算法,能更快地找到最优路径

效果如下

 

现在将设置改为:除了到达垃圾和充电状态之外,其他奖赏都为0,同样采用如下条件的算法进行训练,此时优先遍历算法效果略低于Dyna-Q算法

可能原因如下

1:Q值更新次数太少

2:扫地机器人环境设计状态空间较小,在大规模空间中优先遍历算法优势大

3:优先遍历算法只对靠近终止位置的状态进行规划更新

效果如下

图一代码如下

from random import random, choice
import queue as Q
import matplotlib.pyplot as plt
from matplotlib.font_manager import FontProperties
import gym
from gym import spaces
from gym.utils import seeding
class Grid(object):
    def __init__(self, x:int = None,
                 y:int = None,
                 type:int = 0,
                 reward:float = 0.0):
        self.x = x                          # 坐标x
        self.y = y
        self.type = type                    # 类别值(0:空;1:障碍或边界)
        self.reward = reward                # 该格子的即时奖励
        self.name = None                    # 该格子的名称
        self._update_name()
    def _update_name(self):
        self.name = "X{0}-Y{1}".format(self.x, self.y)
    def __str__(self):
        return "name:{3}, x:{0}, y:{1}, type:{2}".format(self.x,
                                                         self.y,
                                                         self.type,
                                                         self.name
                                                         )
class GridMatrix(object):
    def __init__(self, n_width:int,                 # 水平方向格子数
                 n_height:int,                # 竖直方向格子数
                 default_type:int = 0,        # 默认类型
                 default_reward:float = 0.0,  # 默认即时奖励值
                 ):
        self.grids = None
        self.n_height = n_height
        self.n_width = n_width
        self.len = n_width * n_height
        self.default_reward = default_reward
        self.default_type = default_type
        self.reset()
    def reset(self):
        self.grids = []
        for x in range(self.n_height):
            for y in range(self.n_width):
                self.grids.append(Grid(x,
                                       y,
                                       self.default_type,
                                       self.default_reward))
    def get_grid(self, x, y=None):
        '''获取一个格子信息
        args:坐标信息,由x,y表示或仅有一个类型为tuple的x表示
        return:grid object
        '''
        xx, yy = None, None
        if isinstance(x, int):
            xx, yy = x, y
        elif isinstance(x, tuple):
            xx, yy = x[0], x[1]
        assert(xx >= 0 and yy >= 0 and xx < self.n_width and yy < self.n_height), "任意坐标值应在合理区间"
        index = yy * self.n_width + xx
        return self.grids[index]
    def set_reward(self, x, y, reward):
        grid = self.get_grid(x, y)
        if grid is not None:
            grid.reward = reward
        else:
            raise("grid doesn't exist")
    def set_type(self, x, y, type):
        grid = self.get_grid(x, y)
        if grid is not None:
            grid.type = type
        else:
            raise("grid doesn't exist")
    def get_reward(self, x, y):
        grid = self.get_grid(x, y)
        if grid is None:
            return None
        return grid.reward
    def get_type(self, x, y):
        grid = self.get_grid(x, y)
        if grid is None:
            return None
        return grid.type
class GridWorldEnv(gym.Env):
    metadata = {
        'render.modes': ['human', 'rgb_array'],
        'video.frames_per_second': 30
    }
    def __init__(self, n_width: int=5,
                 n_height: int = 5,
                 u_size=40,
                 default_reward: float = 0.0,
                 default_type=0):
        self.u_size = u_size                        # 当前格子绘制尺寸
        self.n_width = n_width                      # 格子世界宽度(以格子数计)
        self.n_height = n_height                    # 高度
        self.width = u_size * n_width               # 场景宽度 screen width
        self.height = u_size * n_height             # 场景长度
        self.default_reward = default_reward
        self.default_type = default_type
        self.grids = GridMatrix(n_width=self.n_width,
                                n_height=self.n_height,
                                default_reward=self.default_reward,
                                default_type=self.default_type)
        self.reward = 0                             # for rendering
        self.action = None                          # for rendering
        # 0,1,2,3 represent up, down, left, right
        self.action_space = spaces.Discrete(4)
        # 观察空间由low和high决定
        self.observation_space = spaces.Discrete(self.n_height * self.n_width)
        self.ends = [(0, 0)]  # 终止格子坐标,可以有多个
        self.start = (0, 4)  # 起始格子坐标,只有一个
        self.types = [(2, 2, 1)]
        self.rewards = []
        self.refresh_setting()
        self.viewer = None  # 图形接口对象
        self.seed()  # 产生一个随机子
        self.reset()
    def seed(self, seed=None):
        # 产生一个随机化时需要的种子,同时返回一个np_random对象,支持后续的随机化生成操作
        self.np_random, seed = seeding.np_random(seed)
        return [seed]
    def step(self, action):
        assert self.action_space.contains(action), "%r (%s) invalid" % (action, type(action))
        self.action = action                        # action for rendering
        old_x, old_y = self._state_to_xy(self.state)
        new_x, new_y = old_x, old_y
        if action == 0: new_y += 1  # up
        elif action == 1: new_y -= 1  # down
        elif action == 2: new_x -= 1  # left
        elif action == 3: new_x += 1  # right
        # boundary effect
        if new_x < 0: new_x = 0
        if new_x >= self.n_width: new_x = self.n_width - 1
        if new_y < 0: new_y = 0
        if new_y >= self.n_height: new_y = self.n_height - 1
        # wall effect:
        # 类型为1的格子为障碍格子,不可进入
        if self.grids.get_type(new_x, new_y) == 1:
            new_x, new_y = old_x, old_y
        self.reward = self.grids.get_reward(new_x, new_y)
        done = self._is_end_state(new_x, new_y)
        self.state = self._xy_to_state(new_x, new_y)
        # 提供格子世界所有的信息在info内
        info = {"x": new_x, "y": new_y, "grids": self.grids}
        return self.state, self.reward, done, info
    # 将状态变为横纵坐标
    def _state_to_xy(self, s):
        x = s % self.n_width
        y = int((s - x) / self.n_width)
        return x, y
    def _xy_to_state(self, x, y=None):
        if isinstance(x, int):
            assert (isinstance(y, int)), "incomplete Position info"
            return x + self.n_width * y
        elif isinstance(x, tuple):
            return x[0] + self.n_width * x[1]
        return -1  # 未知状态
    def refresh_setting(self):
        '''用户在使用该类创建格子世界后可能会修改格子世界某些格子类型或奖励值
        的设置,修改设置后通过调用该方法使得设置生效。
        '''
        for x, y, r in self.rewards:
            self.grids.set_reward(x, y, r)
        for x, y, t in self.types:
            self.grids.set_type(x, y, t)
    def reset(self):
        self.state = self._xy_to_state(self.start)
        return self.state
    # 判断是否是终止状态
    def _is_end_state(self, x, y=None):
        if y is not None:
            xx, yy = x, y
        elif isinstance(x, int):
            xx, yy = self._state_to_xy(x)
        else:
            assert (isinstance(x, tuple)), "坐标数据不完整"
            xx, yy = x[0], x[1]
        for end in self.ends:
            if xx == end[0] and yy == end[1]:
                return True
        return False
    # 图形化界面
    def render(self, mode='human', close=False):
        if close:
            if self.viewer is not None:
                self.viewer.close()
                self.viewer = None
            return
        zero = (0, 0)
        u_size = self.u_size
        m = 2                                       # 格子之间的间隙尺寸
        # 如果还没有设定屏幕对象,则初始化整个屏幕具备的元素。
        if self.viewer is None:
            from gym.envs.classic_control import rendering
            self.viewer = rendering.Viewer(self.width, self.height)
            # 绘制格子
            for x in range(self.n_width):
                for y in range(self.n_height):
                    v = [(x * u_size + m, y * u_size + m),
                         ((x + 1) * u_size - m, y * u_size + m),
                         ((x + 1) * u_size - m, (y + 1) * u_size - m),
                         (x * u_size + m, (y + 1) * u_size - m)]
                    rect = rendering.FilledPolygon(v)
                    r = self.grids.get_reward(x, y) / 10
                    if r < 0:
                        rect.set_color(0.9 - r, 0.9 + r, 0.9 + r)
                    elif r > 0:
                        rect.set_color(0.3, 0.5 + r, 0.3)
                    else:
                        rect.set_color(0.9, 0.9, 0.9)
                    self.viewer.add_geom(rect)
                    # 绘制边框
                    v_outline = [(x * u_size + m, y * u_size + m),
                                 ((x + 1) * u_size - m, y * u_size + m),
                                 ((x + 1) * u_size - m, (y + 1) * u_size - m),
                                 (x * u_size + m, (y + 1) * u_size - m)]
                    outline = rendering.make_polygon(v_outline, False)
                    outline.set_linewidth(3)
                    if self._is_end_state(x, y):
                        # 给终点方格添加金黄色边框
                        outline.set_color(0.9, 0.9, 0)
                        self.viewer.add_geom(outline)
                    if self.start[0] == x and self.start[1] == y:
                        outline.set_color(0.5, 0.5, 0.8)
                        self.viewer.add_geom(outline)
                    if self.grids.get_type(x, y) == 1:  # 障碍格子用深灰色表示
                        rect.set_color(0.3, 0.3, 0.3)
                    else:
                        pass
            # 绘制个体
            self.agent = rendering.make_circle(u_size / 4, 30, True)
            self.agent.set_color(1.0, 1.0, 0.0)
            self.viewer.add_geom(self.agent)
            self.agent_trans = rendering.Transform()
            self.agent.add_attr(self.agent_trans)
            # 更新个体位置
        x, y = self._state_to_xy(self.state)
        self.agent_trans.set_translation((x + 0.5) * u_size, (y + 0.5) * u_size)
        return self.viewer.render(return_rgb_array= mode == 'rgb_array')
# 环境参数设定
class Agent():
    def __init__(self, env):
        self.episode = 1
        self.Q = {}
        self.actions = [0, 1, 2, 3]
        self.position = env.start
        self.model={}
        self.priority_mode={}
        self.q=Q.PriorityQueue()
    # 建立模型
    def make_model(self, pos, act, reward, next_state):
        self.model["{0},{1}".format(pos, act)] = "{0},{1}".format(reward, next_state)
    # 模型规划
    def q_planning(self,n):
        for i in range(0, n):
            a = [i for i in self.model.keys()]
            done=False
            if a != []:
                str = choice(a)
                pos = str.split(",")[0]+","+str.split(",")[1]
                act = int(str.split(",")[2])
                reward = float(self.model[str].split(",")[0])
                next_state = self.model[str].split(",")[1]+","+self.model[str].split(",")[2]
                if next_state == "(8,5)" or next_state == "(1,6)":
                    done = True
                self.updateQ(pos, act, next_state, reward, done)
    # 优先序列
    def queue_put(self,s,a):
        self.chaxunQ(s, a)
        reward = self.model["{0},{1}".format(s, a)].split(",")[0]
        next_state = self.model["{0},{1}".format(s, a)].split(",")[1]+","+self.model["{0},{1}".format(s,a)].split(",")[2]
        argmax_action = self.performmax(next_state)
        old_q = self.Q["{0},{1}".format(s, a)]
        new_q = self.Q["{0},{1}".format(next_state, argmax_action)]
        p = abs(float(reward)+0.9*new_q-old_q)
        if p > maque:
            self.q.put([p, "{0},{1}".format(s, a)])
    # 优先扫描
    def priority_sweeping(self):
        done = False
        while self.q.empty() == False:
            elem = self.q.get()
            pos = elem[1].split(",")[0]+","+elem[1].split(",")[1]
            act = elem[1].split(",")[2]
            next_state = self.model["{0},{1}".format(pos, act)].split(",")[1]+","+self.model["{0},{1}".format(pos, act)].split(",")[2]
            reward = self.model["{0},{1}".format(pos, act)].split(",")[0]
            if next_state == "(0,0)" or next_state == "(4,3)":
                done = True
            self.updateQ(pos, act, next_state, float(reward), done)
            for k in self.model:
                state = k.split(",")[0]+","+k.split(",")[1]
                state_act = k.split(",")[2]
                next_pos = self.model[k].split(",")[1]+","+self.model[k].split(",")[2]
                if next_pos == pos:
                    self.queue_put(state, state_act)
    def chaxunQ(self, pos, act):
        judge = False
        for i in self.Q:
            if i == "{0},{1}".format(pos, act):
                judge = True
                break
        if judge == True:
            return True
        else:
            self.Q["{0},{1}".format(pos, act)] = float(format(random()/10000, '.3f'))
            return
    # 更新状态动作值Q函数
    def updateQ(self, pos, action, next_pos, reward, done):
        if done == False:
            self.chaxunQ(pos, action)
            old_q = self.Q["{0},{1}".format(pos, action)]
            action1 = self.performmax(next_pos)
            # self.chaxunQ(next_pos, action1)
            new_q = self.Q["{0},{1}".format(next_pos, action1)]
            old_q = old_q + 0.1 * (reward+0.9 * new_q - old_q)
            self.Q["{0},{1}".format(pos, action)] = float(format(old_q, '.3f'))
        else:
            self.chaxunQ(pos, action)
            self.Q["{0},{1}".format(pos, action)] = float(format(reward, '.3f'))
            # print(pos, action,reward)
    # 动作选择策略
    def perform(self, pos):
        eplison = random()
        self.chaxunQ(pos, choice([0, 1, 2, 3]))
        if eplison > 1/self.episode:
            maxq = -1000
            act = ""
            for i in self.Q:
                list = i.split(",")
                state = list[0] + "," + list[1]
                if state == str(pos):
                    if self.Q[i] > maxq:
                        maxq = self.Q[i]
                        act = list[2]
            return int(act)
        else:
            return choice([0, 1, 2, 3])
    # argmaxQ
    def performmax(self, pos):
        maxq = -1000
        str1 = ""
        self.chaxunQ(pos,choice([0,1,2,3]))
        for i in self.Q:
            list = i.split(",")
            state = list[0]+","+list[1]
            if state == str(pos):
                if self.Q[i] > maxq:
                    maxq = self.Q[i]
                    str1 = list[2]
        return int(str1)
# Dyna-Q
def run(n):
    agent = Agent(env)
    total_j = 0
    total_r=0
    a=[]
    b=[]
    # env.types = [(2,4,0)]
    env.refresh_setting()
    for i in range(0, 300):
        done = False
        j = 0
        env.reset()
        r = 0
        while done == False and j<50:
            j = j + 1
            state = env._state_to_xy(env.state)
            action = agent.perform(state)
            next_state, reward, done, info = env.step(action)
            next_state = env._state_to_xy(next_state)
            # 更新模型
            agent.make_model(state, action, reward, next_state)
            # 更新Q值
            agent.updateQ(state, action, next_state, reward, done)
            r += reward
            # 模型规划
            agent.q_planning(n)
        if i >= 2:
            total_r += r
            a.append(total_r)
        agent.episode += 1
        total_j += j
        b.append(j)
        print("回合={0},步数={1},奖赏={2}".format(i, j, '%.3f' % r))
    # return (np.array(a)/np.array(total_j)).tolist()
    # return a
    return a
# 优先扫描
def p_run():
    agent = Agent(env)
    a = []
    total_r = 0
    total_j = 0
    b = []
    for i in range(0, 300):
        done = False
        j = 0
        env.reset()
        r = 0
        while done == False and j < 50:
            j = j+1
            state = env._state_to_xy(env.state)
            action = agent.perform(state)
            next_state, reward, done, info = env.step(action)
            next_state = env._state_to_xy(next_state)
            agent.make_model(state, action, reward, next_state)
            agent.updateQ(state, action, next_state, reward, done)
            # 加入优先序列
            agent.queue_put(state, action)
            r += reward
        if i >= 2:
            total_r += r
            a.append(total_r)
        # 优先扫描
        agent.priority_sweeping()
        agent.episode += 1
        # print(agent.Q)
        total_j += j
        b.append(j)
        print("回合={0},步数={1},奖赏={2}".format(i, j, '%.3f' % r))
  = [(1, 2, 1), (2, 2, 1), (3, 2, 1), (4, 2, 1), (5, 2, 1), (6, 2, 1), (7, 2, 1), (8, 2, 1), (9, 2, 1),
                 (10, 2, 1)]
    env.rewards = [(8, 5, 1), (1, 6, 1)]  # 奖赏值设定
    env.start = (4, 0)
    env.ends = [(8, 5), (1, 6)]
    env.refresh_setting()
    x = range(2, 300)
    ln1, = plt.plot(x, p_run(), label=u"n=0")
    ln2, = plt.plot(x, run(10), label=u"n=10")
    plt.xlabel(u'情节', fontproperties=font1)
    plt.ylabel(u'累计奖励', fontproperties=font1)
    plt.show()

图2代码如下

import 扫地机器人gym环境 as bg
from random import random, choice
import queue as Q
import matplotlib.pyplot as plt
from matplotlib.font_manager import FontProperties
import gym
from gym import spaces
from gym.utils import seeding
import numpy as np
class Grid(object):
    def __init__(self, x: int = None,
                 y: int = None,
                 type: int = 0,
                 reward: float = 0.0):
        self.x = x  # 坐标x
        self.y = y
        self.type = type  # 类别值(0:空;1:障碍或边界)
        self.reward = reward  # 该格子的即时奖励
        self.name = None  # 该格子的名称
        self._update_name()
    def _update_name(self):
        self.name = "X{0}-Y{1}".format(self.x, self.y)
    def __str__(self):
        return "name:{3}, x:{0}, y:{1}, type:{2}".format(self.x,
                                                         self.y,
                                                         self.type,
                                                         self.name
                                                         )
class GridMatrix(object):
    def __init__(self, n_width: int,  # 水平方向格子数
                 n_height: int,  # 竖直方向格子数
                 default_type: int = 0,  # 默认类型
                 default_reward: float = 0.0,  # 默认即时奖励值
                 ):
        self.grids = None
        self.n_height = n_height
        self.n_width = n_width
        self.len = n_width * n_height
        self.default_reward = default_reward
        self.default_type = default_type
        self.reset()
    def reset(self):
        self.grids = []
        for x in range(self.n_height):
            for y in range(self.n_width):
                self.grids.append(Grid(x,
                                       y,
                                       self.default_type,
                                       self.default_reward))
    def get_grid(self, x, y=None):
        '''获取一个格子信息
        args:坐标信息,由x,y表示或仅有一个类型为tuple的x表示
        return:grid object
        '''
        xx, yy = None, None
        if isinstance(x, int):
            xx, yy = x, y
        elif isinstance(x, tuple):
            xx, yy = x[0], x[1]
        assert (xx >= 0 and yy >= 0 and xx < self.n_width and yy < self.n_height), "任意坐标值应在合理区间"
        index = yy * self.n_width + xx
        return self.grids[index]
    def set_reward(self, x, y, reward):
        grid = self.get_grid(x, y)
        if grid is not None:
            grid.reward = reward
        else:
            raise ("grid doesn't exist")
    def set_type(self, x, y, type):
        grid = self.get_grid(x, y)
        if grid is not None:
            grid.type = type
        else:
            raise ("grid doesn't exist")
    def get_reward(self, x, y):
        grid = self.get_grid(x, y)
        if grid is None:
            return None
        return grid.reward
    def get_type(self, x, y):
        grid = self.get_grid(x, y)
        if grid is None:
            return None
        return grid.type
# 格子世界环境
class GridWorldEnv(gym.Env):
    metadata = {
        'render.modes': ['human', 'rgb_array'],
        'video.frames_per_second': 30
    }
    def __init__(self, n_width: int = 5,
                 n_height: int = 5,
                 u_size=40,
                 default_reward: float = 0.0,
                 default_type=0):
        self.u_size = u_size  # 当前格子绘制尺寸
        self.n_width = n_width  # 格子世界宽度(以格子数计)
        self.n_height = n_height  # 高度
        self.width = u_size * n_width  # 场景宽度 screen width
        self.height = u_size * n_height  # 场景长度
        self.default_reward = default_reward
        self.default_type = default_type
        self.grids = GridMatrix(n_width=self.n_width,
                                n_height=self.n_height,
                                default_reward=self.default_reward,
                                default_type=self.default_type)
        self.reward = 0  # for rendering
        self.action = None  # for rendering
        # 0,1,2,3 represent up, down, left, right
        self.action_space = spaces.Discrete(4)
        # 观察空间由low和high决定
        self.observation_space = spaces.Discrete(self.n_height * self.n_width)
        self.ends = [(0, 0)]  # 终止格子坐标,可以有多个
        self.start = (0, 4)  # 起始格子坐标,只有一个
        self.types = [(2, 2, 1)]
        self.rewards = []
        self.refresh_setting()
        self.viewer = None  # 图形接口对象
        self.seed()  # 产生一个随机子
        self.reset()
    def seed(self, seed=None):
        # 产生一个随机化时需要的种子,同时返回一个np_random对象,支持后续的随机化生成操作
        self.np_random, seed = seeding.np_random(seed)
        return [seed]
    def step(self, action):
        assert self.action_space.contains(action), "%r (%s) invalid" % (action, type(action))
        self.action = action  # action for rendering
        old_x, old_y = self._state_to_xy(self.state)
        new_x, new_y = old_x, old_y
        if action == 0:
            new_y += 1  # up
        elif action == 1:
            new_y -= 1  # down
        elif action == 2:
            new_x -= 1  # left
        elif action == 3:
            new_x += 1  # right
        # boundary effect
        if new_x < 0: new_x = 0
        if new_x >= self.n_width: new_x = self.n_width - 1
        if new_y < 0: new_y = 0
        if new_y >= self.n_height: new_y = self.n_height - 1
        # wall effect:
        # 类型为1的格子为障碍格子,不可进入
        if self.grids.get_type(new_x, new_y) == 1:
            new_x, new_y = old_x, old_y
        self.reward = self.grids.get_reward(new_x, new_y)
        done = self._is_end_state(new_x, new_y)
        self.state = self._xy_to_state(new_x, new_y)
        # 提供格子世界所有的信息在info内
        info = {"x": new_x, "y": new_y, "grids": self.grids}
        return self.state, self.reward, done, info
    # 将状态变为横纵坐标
    def _state_to_xy(self, s):
        x = s % self.n_width
        y = int((s - x) / self.n_width)
        return x, y
    def _xy_to_state(self, x, y=None):
        if isinstance(x, int):
            assert (isinstance(y, int)), "incomplete Position info"
            return x + self.n_width * y
        elif isinstance(x, tuple):
            return x[0] + self.n_width * x[1]
        return -1  # 未知状态
    def refresh_setting(self):
        '''用户在使用该类创建格子世界后可能会修改格子世界某些格子类型或奖励值
        的设置,修改设置后通过调用该方法使得设置生效。
        '''
        for x, y, r in self.rewards:
            self.grids.set_reward(x, y, r)
        for x, y, t in self.types:
            self.grids.set_type(x, y, t)
    def reset(self):
        self.state = self._xy_to_state(self.start)
        return self.state
    # 判断是否是终止状态
    def _is_end_state(self, x, y=None):
        if y is not None:
            xx, yy = x, y
        elif isinstance(x, int):
            xx, yy = self._state_to_xy(x)
        else:
            assert (isinstance(x, tuple)), "坐标数据不完整"
            xx, yy = x[0], x[1]
        for end in self.ends:
            if xx == end[0] and yy == end[1]:
                return True
        return False
    # 图形化界面
    def render(self, mode='human', close=False):
        if close:
            if self.viewer is not None:
                self.viewer.close()
                self.viewer = None
            return
        zero = (0, 0)
        u_size = self.u_size
        m = 2  # 格子之间的间隙尺寸
        # 如果还没有设定屏幕对象,则初始化整个屏幕具备的元素。
        if self.viewer is None:
            from gym.envs.classic_control import rendering
            self.viewer = rendering.Viewer(self.width, self.height)
            # 绘制格子
            for x in range(self.n_width):
                for y in range(self.n_height):
                    v = [(x * u_size + m, y * u_size + m),
                         ((x + 1) * u_size - m, y * u_size + m),
                         ((x + 1) * u_size - m, (y + 1) * u_size - m),
                         (x * u_size + m, (y + 1) * u_size - m)]
                    rect = rendering.FilledPolygon(v)
                    r = self.grids.get_reward(x, y) / 10
                    if r < 0:
                        rect.set_color(0.9 - r, 0.9 + r, 0.9 + r)
                    elif r > 0:
                        rect.set_color(0.3, 0.5 + r, 0.3)
                    else:
                        rect.set_color(0.9, 0.9, 0.9)
                    self.viewer.add_geom(rect)
                    # 绘制边框
                    v_outline = [(x * u_size + m, y * u_size + m),
                                 ((x + 1) * u_size - m, y * u_size + m),
                                 ((x + 1) * u_size - m, (y + 1) * u_size - m),
                                 (x * u_size + m, (y + 1) * u_size - m)]
                    outline = rendering.make_polygon(v_outline, False)
                    outline.set_linewidth(3)
                    if self._is_end_state(x, y):
                        # 给终点方格添加金黄色边框
                        outline.set_color(0.9, 0.9, 0)
                        self.viewer.add_geom(outline)
                    if self.start[0] == x and self.start[1] == y:
                        outline.set_color(0.5, 0.5, 0.8)
                        self.viewer.add_geom(outline)
                    if self.grids.get_type(x, y) == 1:  # 障碍格子用深灰色表示
                        rect.set_color(0.3, 0.3, 0.3)
                    else:
                        pass
            # 绘制个体
            self.agent = rendering.make_circle(u_size / 4, 30, True)
            self.agent.set_color(1.0, 1.0, 0.0)
            self.viewer.add_geom(self.agent)
            self.agent_trans = rendering.Transform()
            self.agent.add_attr(self.agent_trans)
            # 更新个体位置
        x, y = self._state_to_xy(self.state)
        self.agent_trans.set_translation((x + 0.5) * u_size, (y + 0.5) * u_size)
        return self.viewer.render(return_rgb_array=mode == 'rgb_array')
# 环境参数设定
class Agent():
    def __init__(self, env):
        self.episode = 1
        self.Q = {}
        self.actions = [0, 1, 2, 3]
        self.position = env.start
        self.model = {}
        self.priority_mode = {}
        self.q = Q.PriorityQueue()
    # 建立模型
    def make_model(self, pos, act, reward, next_state):
        self.model["{0},{1}".format(pos, act)] = "{0},{1}".format(reward, next_state)
    # 模型规划
    def q_planning(self, n):
        for i in range(0, n):
            a = [i for i in self.model.keys()]
            done = False
            if a != []:
                str = choice(a)
                pos = str.split(",")[0] + "," + str.split(",")[1]
                act = int(str.split(",")[2])
                reward = float(self.model[str].split(",")[0])
                next_state = self.model[str].split(",")[1] + "," + self.model[str].split(",")[2]
                if next_state == "(8,5)" or next_state == "(1,6)":
                    done = True
                self.updateQ(pos, act, next_state, reward, done)
    # 加入优先序列
    def queue_put(self, s, a):
        self.chaxunQ(s, a)
        reward = self.model["{0},{1}".format(s, a)].split(",")[0]
        next_state = self.model["{0},{1}".format(s, a)].split(",")[1] + "," + \
                     self.model["{0},{1}".format(s, a)].split(",")[2]
        argmax_action = self.performmax(next_state)
        old_q = self.Q["{0},{1}".format(s, a)]
        new_q = self.Q["{0},{1}".format(next_state, argmax_action)]
        p = abs(float(reward) + 0.9 * new_q - old_q)
        if p > maque:
            self.q.put([p, "{0},{1}".format(s, a)])
    # 优先扫描
    def priority_sweeping(self):
        done = False
        while self.q.empty() == False:
            elem = self.q.get()
            pos = elem[1].split(",")[0] + "," + elem[1].split(",")[1]
            act = elem[1].split(",")[2]
            next_state = self.model["{0},{1}".format(pos, act)].split(",")[1] + "," + \
                         self.model["{0},{1}".format(pos, act)].split(",")[2]
            reward = self.model["{0},{1}".format(pos, act)].split(",")[0]
            if next_state == "(0,0)" or next_state == "(4,3)":
                done = True
            self.updateQ(pos, act, next_state, float(reward), done)
            for k in self.model:
                state = k.split(",")[0] + "," + k.split(",")[1]
                state_act = k.split(",")[2]
                next_pos = self.model[k].split(",")[1] + "," + self.model[k].split(",")[2]
                if next_pos == pos:
                    self.queue_put(state, state_act)
    def chaxunQ(self, pos, act):
        judge = False
        for i in self.Q:
            if i == "{0},{1}".format(pos, act):
                judge = True
                break
        if judge == True:
            return True
        else:
            self.Q["{0},{1}".format(pos, act)] = float(format(random() / 10000, '.3f'))
            return
    # 更新状态动作值Q函数
    def updateQ(self, pos, action, next_pos, reward, done):
        if done == False:
            self.chaxunQ(pos, action)
            old_q = self.Q["{0},{1}".format(pos, action)]
            action1 = self.performmax(next_pos)
            # self.chaxunQ(next_pos, action1)
            new_q = self.Q["{0},{1}".format(next_pos, action1)]
            old_q = old_q + 0.1 * (reward + 0.9 * new_q - old_q)
            self.Q["{0},{1}".format(pos, action)] = float(format(old_q, '.3f'))
        else:
            self.chaxunQ(pos, action)
            self.Q["{0},{1}".format(pos, action)] = float(format(reward, '.3f'))
            # print(pos, action,reward)
    # 动作选取策略
    def perform(self, pos):
        eplison = random()
        self.chaxunQ(pos, choice([0, 1, 2, 3]))
        if eplison > 1 / self.episode:
            maxq = -1000
            act = ""
            for i in self.Q:
                list = i.split(",")
                state = list[0] + "," + list[1]
                if state == str(pos):
                    if self.Q[i] > maxq:
                        maxq = self.Q[i]
                        act = list[2]
            return int(act)
        else:
            return choice([0, 1, 2, 3])
    # argmaxQ
    def performmax(self, pos):
        maxq = -1000
        str1 = ""
        self.chaxunQ(pos, choice([0, 1, 2, 3]))
        for i in self.Q:
            list = i.split(",")
            state = list[0] + "," + list[1]
            if state == str(pos):
                if self.Q[i] > maxq:
                    maxq = self.Q[i]
                    str1 = list[2]
        return int(str1)
# Dyna-Q
def run(n):
    agent = Agent(env)
    total_j = 0
    total_r = 0
    a = []
    b = []
    # env.types = [(2,4,0)]
    env.refresh_setting()
    for i in range(0, 300):
        done = False
        j = 0
        env.reset()
        r = 0
        while done == False and j < 500:
            j = j + 1
            state = env._state_to_xy(env.state)
            action = agent.perform(state)
            next_state, reward, done, info = env.step(action)
            next_state = env._state_to_xy(next_state)
            # 模型更新
            agent.make_model(state, action, reward, next_state)
            # 更新Q值
            agent.updateQ(state, action, next_state, reward, done)
            r += reward
            # 模型规划
            agent.q_planning(n)
        total_r += r
        a.append(total_r)
        agent.episode += 1
        total_j += j
        b.append(j)
        print("回合={0},步数={1},奖赏={2}".format(i, j, '%.3f' % r))
    return (np.array(a) / np.array(total_j)).tolist()
# 优先扫描
def p_run():
    agent = Agent(env)
    a = []
    total_r = 0
    total_j = 0
    b = []
    for i in range(0, 300):
        done = False
        j = 0
        env.reset()
        r = 0
        while done == False and j < 500:
            j = j + 1
            state = env._state_to_xy(env.state)
            action = agent.perform(state)
            next_state, reward, done, info = env.step(action)
            next_state = env._state_to_xy(next_state)
            agent.make_model(state, action, reward, next_state)
            agent.updateQ(state, action, next_state, reward, done)
            # 加入优先序列
            agent.queue_put(state, action)
            r += reward
        total_r += r
        a.append(total_r)
        # 优先扫描
        agent.priority_sweeping()
        agent.episode += 1
        total_j += j
        b.append(j)
        print("回合={0},步数={1},奖赏={2}".format(i, j, '%.3f' % r))
    return (np.array(a) / np.array(total_j)).tolist()
if __name__ == "__main__":
    maque )]  # 奖赏值设定
    env.start = (6, 0)
    env.ends = [(8, 5), (1, 6)]
    env.refresh_setting()
    font1 = Fnt1)
    plt.show()

创作不易 觉得有帮助请点赞关注收藏~~~

相关文章
|
5天前
|
机器学习/深度学习 人工智能 算法
基于Python深度学习的【垃圾识别系统】实现~TensorFlow+人工智能+算法网络
垃圾识别分类系统。本系统采用Python作为主要编程语言,通过收集了5种常见的垃圾数据集('塑料', '玻璃', '纸张', '纸板', '金属'),然后基于TensorFlow搭建卷积神经网络算法模型,通过对图像数据集进行多轮迭代训练,最后得到一个识别精度较高的模型文件。然后使用Django搭建Web网页端可视化操作界面,实现用户在网页端上传一张垃圾图片识别其名称。
25 0
基于Python深度学习的【垃圾识别系统】实现~TensorFlow+人工智能+算法网络
|
5天前
|
机器学习/深度学习 人工智能 算法
【手写数字识别】Python+深度学习+机器学习+人工智能+TensorFlow+算法模型
手写数字识别系统,使用Python作为主要开发语言,基于深度学习TensorFlow框架,搭建卷积神经网络算法。并通过对数据集进行训练,最后得到一个识别精度较高的模型。并基于Flask框架,开发网页端操作平台,实现用户上传一张图片识别其名称。
21 0
【手写数字识别】Python+深度学习+机器学习+人工智能+TensorFlow+算法模型
|
5天前
|
机器学习/深度学习 人工智能 算法
基于深度学习的【蔬菜识别】系统实现~Python+人工智能+TensorFlow+算法模型
蔬菜识别系统,本系统使用Python作为主要编程语言,通过收集了8种常见的蔬菜图像数据集('土豆', '大白菜', '大葱', '莲藕', '菠菜', '西红柿', '韭菜', '黄瓜'),然后基于TensorFlow搭建卷积神经网络算法模型,通过多轮迭代训练最后得到一个识别精度较高的模型文件。在使用Django开发web网页端操作界面,实现用户上传一张蔬菜图片识别其名称。
25 0
基于深度学习的【蔬菜识别】系统实现~Python+人工智能+TensorFlow+算法模型
|
21天前
|
机器学习/深度学习 人工智能 算法
【车辆车型识别】Python+卷积神经网络算法+深度学习+人工智能+TensorFlow+算法模型
车辆车型识别,使用Python作为主要编程语言,通过收集多种车辆车型图像数据集,然后基于TensorFlow搭建卷积网络算法模型,并对数据集进行训练,最后得到一个识别精度较高的模型文件。再基于Django搭建web网页端操作界面,实现用户上传一张车辆图片识别其类型。
65 0
【车辆车型识别】Python+卷积神经网络算法+深度学习+人工智能+TensorFlow+算法模型
|
1月前
|
机器学习/深度学习 人工智能 算法
揭开深度学习与传统机器学习的神秘面纱:从理论差异到实战代码详解两者间的选择与应用策略全面解析
【10月更文挑战第10天】本文探讨了深度学习与传统机器学习的区别,通过图像识别和语音处理等领域的应用案例,展示了深度学习在自动特征学习和处理大规模数据方面的优势。文中还提供了一个Python代码示例,使用TensorFlow构建多层感知器(MLP)并与Scikit-learn中的逻辑回归模型进行对比,进一步说明了两者的不同特点。
64 2
|
1月前
|
机器学习/深度学习 人工智能 算法
【玉米病害识别】Python+卷积神经网络算法+人工智能+深度学习+计算机课设项目+TensorFlow+模型训练
玉米病害识别系统,本系统使用Python作为主要开发语言,通过收集了8种常见的玉米叶部病害图片数据集('矮花叶病', '健康', '灰斑病一般', '灰斑病严重', '锈病一般', '锈病严重', '叶斑病一般', '叶斑病严重'),然后基于TensorFlow搭建卷积神经网络算法模型,通过对数据集进行多轮迭代训练,最后得到一个识别精度较高的模型文件。再使用Django搭建Web网页操作平台,实现用户上传一张玉米病害图片识别其名称。
55 0
【玉米病害识别】Python+卷积神经网络算法+人工智能+深度学习+计算机课设项目+TensorFlow+模型训练
|
25天前
|
算法 安全 数据安全/隐私保护
基于game-based算法的动态频谱访问matlab仿真
本算法展示了在认知无线电网络中,通过游戏理论优化动态频谱访问,提高频谱利用率和物理层安全性。程序运行效果包括负载因子、传输功率、信噪比对用户效用和保密率的影响分析。软件版本:Matlab 2022a。完整代码包含详细中文注释和操作视频。
|
9天前
|
算法 数据挖掘 数据安全/隐私保护
基于FCM模糊聚类算法的图像分割matlab仿真
本项目展示了基于模糊C均值(FCM)算法的图像分割技术。算法运行效果良好,无水印。使用MATLAB 2022a开发,提供完整代码及中文注释,附带操作步骤视频。FCM算法通过隶属度矩阵和聚类中心矩阵实现图像分割,适用于灰度和彩色图像,广泛应用于医学影像、遥感图像等领域。
|
11天前
|
算法 调度
基于遗传模拟退火混合优化算法的车间作业最优调度matlab仿真,输出甘特图
车间作业调度问题(JSSP)通过遗传算法(GA)和模拟退火算法(SA)优化多个作业在并行工作中心上的加工顺序和时间,以最小化总完成时间和机器闲置时间。MATLAB2022a版本运行测试,展示了有效性和可行性。核心程序采用作业列表表示法,结合遗传操作和模拟退火过程,提高算法性能。
|
11天前
|
存储 算法 决策智能
基于免疫算法的TSP问题求解matlab仿真
旅行商问题(TSP)是一个经典的组合优化问题,目标是寻找经过每个城市恰好一次并返回起点的最短回路。本文介绍了一种基于免疫算法(IA)的解决方案,该算法模拟生物免疫系统的运作机制,通过克隆选择、变异和免疫记忆等步骤,有效解决了TSP问题。程序使用MATLAB 2022a版本运行,展示了良好的优化效果。

热门文章

最新文章