强化学习实战(一):策略迭代算法实现机器人快速寻找金币

简介: 强化学习实战(一):策略迭代算法实现机器人快速寻找金币

一、构建机器人寻找金币的环境


构建机器人寻找金币的环境需要编写四个主要函数:


reset():利用均匀随机分布初始化智能体的状态;


render():扮演图像引擎的角色,渲染,显示环境中物体的图像;


step():扮演物理引擎的角色,模拟环境中物体的运动规律;


transform():状态转换,获得下一个状态,立即汇报,是否终止和调试项


在编写render()时,我们可以通过gym库中的classic_control目录下的redering模块绘制环境,主要利用rendering模块中的make_circle()、Line()、Transfrom()等函数或类绘制环境,代码如下:


import logging
import random
import numpy as np  
import gym 
import ptvsd
ptvsd.enable_attach(address = ('0.0.0.0', 5678))
ptvsd.wait_for_attach()
logger=logging.getLogger(__name__)
class GridEnv(gym.Env):
    metadata={
        'render.modes':['human','rgb_array'],
        'video.frames_per_second':2
    }
    def __init__(self):
        #机器人初始化状态
        self.x=[140,220,300,380,460,140,300,460]
        self.y=[250,250,250,250,250,150,150,150]
        #终止状态
        self.terminate_states=dict()
        self.terminate_states[6]=1
        self.terminate_states[7]=1
        self.terminate_states[8]=1
        #状态空间
        self.states=[1,2,3,4,5,6,7,8]
        #动作空间
        self.actions=['n','e','w','s']
        #回报函数
        self.rewards=dict()
        self.rewards['1_s']=-1.0
        self.rewards['3_s']=1.0
        self.rewards['5_s']=-1.0
        #状态转移概率
        self.t=dict()
        self.t['1_e']=2
        self.t['1_s']=6
        self.t['2_w']=1
        self.t['2_e']=3
        self.t['3_w']=2
        self.t['3_e']=4
        self.t['3_s']=7
        self.t['4_w']=3
        self.t['4_e']=5
        self.t['5_w']=4
        self.t['5_s']=8
        #折扣因子
        self.gamma=0.8
        #显示器
        self.viewer=None
        #状态
        self.state=None
        #在类对象内部访问实例属性
        #获取终止状态
    #返回下一步状态、立即回报和状态转移概率
    def transform(self,state,action):
        #遍历动作空间,当不在状态转移概率中时,该状态设为-1
        s=-1
        r=0
        key='%i_%s'%(state,action)
        if key in self.rewards:
            r=self.rewards[key]
        if key in self.t:
            s=self.t[key]
        return self.t,s,r
    def getTerminal(self):
        return self.terminate_states
    #获取状态空间
    def getStates(self):
        return self.states
    #获取动作空间
    def getActions(self):
        return self.actions
    #获取折扣因子
    def getGamma(self):
        return self.gamma
    #定义reset()函数
    def reset(self):
        self.state=self.states[int(random.random()*len(self.states))]
        print("hello world")
        return self.state
    #定义step():扮演物理引擎的角色,物理引模拟环境中物体的运动规律
    def step(self,action):
        #系统当前状态
        state=self.state
        #判断当前状态是否处于终止状态
        if state in self.terminate_states:
            return state,0,True,{}
        #'定义的格式化字符串'%实际值
        #当定义的格式化字符串中包含两个以上占位符时,必须将所有实际值封装在元组中
        key='%i_%s'%(state,action)
        #状态转移
        if key in self.t:
            next_state=self.t[key]
        else:
            next_state=state
        #系统当前状态
        self.state=next_state
        is_terminal=False
        if next_state in self.terminate_states:
            is_terminal=True
        if key not in self.rewards:
            r=0.0
        else:
            r=self.rewards[key]
        return next_state,r,is_terminal,{}
    #定义render():扮演图像引擎的角色,图像引擎显示环境中物体的图像
    def render(self,mode='human',close=False):
        if close==True:
            if self.viewer is not None:
                self.viewer.close()
                self.viewer=None
            return
        screen_width=600
        screen_height=400
        if self.viewer is None:
            from gym.envs.classic_control import rendering
            self.viewer=rendering.Viewer(screen_width,screen_height)
            #创建网格世界,一共11条直线
            self.line1=rendering.Line((100,300),(500,300))
            self.line2=rendering.Line((100,200),(500,200))
            self.line3=rendering.Line((100,100),(100,300))
            self.line4=rendering.Line((180,100),(180,300))
            self.line5=rendering.Line((260,100),(260,300))
            self.line6=rendering.Line((340,100),(340,300))
            self.line7=rendering.Line((420,100),(420,300))
            self.line8=rendering.Line((500,100),(500,300))
            self.line9=rendering.Line((100,100),(180,100))
            self.line10=rendering.Line((260,100),(340,100))
            self.line11=rendering.Line((420,100),(500,100))
            #创建死亡区域
            #画圆,半径为40
            self.kulo1=rendering.make_circle(40)
            #圆心为(140,150)
            #创建第一个骷髅
            self.circletrans=rendering.Transform((140,150))
            self.kulo1.add_attr(self.circletrans)
            self.kulo1.set_color(0,0,0)
            #创建第二个骷髅
            self.kulo2=rendering.make_circle(40)
            self.circletrans=rendering.Transform((460,150))
            self.kulo2.add_attr(self.circletrans)
            self.kulo2.set_color(0,0,0)
            #创建金币区域
            self.gold=rendering.make_circle(40)
            self.circletrans=rendering.Transform((300,150))
            self.gold.add_attr(self.circletrans)
            self.gold.set_color(1,0.9,0)
            #创建机器人
            self.robot=rendering.make_circle(30)
            self.robotrans=rendering.Transform()
            self.robot.add_attr(self.robotrans)
            self.robot.set_color(0.8,0.6,0.4)
            #设置颜色并将对象添加到几何中
            self.line1.set_color(0,0,0)
            self.line2.set_color(0,0,0)
            self.line3.set_color(0,0,0)
            self.line4.set_color(0,0,0)
            self.line5.set_color(0,0,0)
            self.line6.set_color(0,0,0)
            self.line7.set_color(0,0,0)
            self.line8.set_color(0,0,0)
            self.line9.set_color(0,0,0)
            self.line10.set_color(0,0,0)
            self.line11.set_color(0,0,0)
            self.viewer.add_geom(self.line1)
            self.viewer.add_geom(self.line2)
            self.viewer.add_geom(self.line3)
            self.viewer.add_geom(self.line4)
            self.viewer.add_geom(self.line5)
            self.viewer.add_geom(self.line6)
            self.viewer.add_geom(self.line7)
            self.viewer.add_geom(self.line8)
            self.viewer.add_geom(self.line9)
            self.viewer.add_geom(self.line10)
            self.viewer.add_geom(self.line11)
            self.viewer.add_geom(self.kulo1)
            self.viewer.add_geom(self.kulo2)
            self.viewer.add_geom(self.gold)
            self.viewer.add_geom(self.robot)
        if self.state is None:
            return None
        #设置机器人圆心坐标
        self.robotrans.set_translation(self.x[self.state-1],self.y[self.state-1])
        return self.viewer.render('rgb_array')


二、测试构建好的环境:


当环境构建好以后需要注册环境,以便通过gym的标准形式调用该环境。环境注册需要三步:


1、将环境文件拷贝到gym安装目录gym/gym/envs/classic_control目录中,拷贝到此目录中的原因是要使用rendering模块绘制环境图像;


2、打开此目录中的初始化文件__init__.py,在文件末尾插入语句:


from gym.envs.classic_control.grid_mdp GridEnv


3、进入上一级目录gym/gym/envs中,打开此目录中的初始化文件__init__.py,在文件末尾插入语句:

register (
    id='GridWorld-v0',  
    entry_point='gym.envs.classic_control:GridEnv',
    max_episode_steps=200,
    reward_threshold=100.0,
)


环境构建好以后,我们就来测试一下环境。


测试代码如下所示:


import gym
import time
env=gym.make('GridWorld-v0')
env.reset()
env.render()
time.sleep(3)


运行测试程序看到如下环境证明环境构建成功:


ab718343cb58702ac176dd47c3cac4f.png


PS:环境中黑色圆代表陷阱,蓝色圆代表金币,绿色圆代表机器人。


三、策略迭代算法寻找最优策略:


现在,我们需要让机器人在此环境中以最快的速度寻找到金币,也就是寻找一个最优策略使得机器人寻找到金币的用时最短,而最优策略就是一组动作,且每一个动作在当前状态下都是最优动作,即当前状态下的最优状态动作值函数所对应的动作。由于机器人寻找金币的问题是一个马尔可夫决策过程问题,而马尔可夫决策过程问题符合使用动态


规划求解问题的两个条件:


1、整个优化问题可以分解为多个子优化问题:在MDP中,最优策略可以分解为一组最优动作;


2、子优化问题的解可以被存储和重复利用:在MDP中,最优动作的解为最优状态动作值函数,而最优状态动作值函数等于当前状态下的最优值函数;


因此,可以使用动态规划求解MDP问题,而动态规划有两种方法:



策略迭代算法:包含策略评估和策略改善。


在策略评估中,迭代计算每个状态的状态值函数直到达到当前策略的真实状态值函数,目的是为了更好的评估当前策略的价值;


在策略改善中,在每个状态下采用贪婪策略(确定性策略)以更新当前策略。


策略评估和策略改善交替进行,直到策略不变为止。


值迭代算法:包含策略评估和策略改善。


在策略评估中只迭代一次,计算初始策略下每个状态的状态值函数。


在策略改善中,在每个状态下采用贪婪策略,将最优状态动作值函数作为当前状态的状态值函数,因此策略改善迭代的是状态值函数,直到状态值函数不变为止。


值迭代算法策略评估和策略改善没有交替进行,只是循环策略改善部分迭代状态值函数。


由于策略迭代算法比值迭代算法迭代收敛速度更快,因此本例采用策略迭代算法寻找最优策略。


代码如下所示(代码中每行都有注释,以方便大家理解):


/

import gym 
import random
import time
env=gym.make('GridWorld-v0')
class Learn:
    def __init__(self,grid_mdp):
        #初始化状态值函数
        self.v=dict()
        for state in grid_mdp.states:
            self.v[state]=0
        #初始化策略,这些策略均在状态转移概率矩阵中
        self.pi=dict()
        #random.choice(seq):返回列表、元组、字符串的随机项
        self.pi[1]=random.choice(['e','s'])
        self.pi[2]=random.choice(['e','w'])
        self.pi[3]=random.choice(['w','s','e'])
        self.pi[4]=random.choice(['e','w'])
        self.pi[5]=random.choice(['w','s'])
    #策略迭代函数
    def policy_iterate(self,grid_mdp):
        #迭代100次直到策略不变为止
        for i in range(100):
            #策略评估和策略改善交替进行
            self.policy_evaluate(grid_mdp)
            self.policy_improve(grid_mdp)
    #策略评估:
    def policy_evaluate(self,grid_mdp):
        #迭代1000次计算每个状态的真实状态值函数
        for i in range(1000):
            delta=0.0
            #遍历状态空间
            for state in grid_mdp.states:
                #终止状态不用计算状态值函数(v=0.0)
                if state in grid_mdp.terminate_states:
                    continue
                action=self.pi[state]
                t,s,r=grid_mdp.transform(state,action)
                #if s!=-1:
                new_v=r+grid_mdp.gamma*self.v[s]
                delta+=abs(new_v-self.v[state])
                #更新状态值函数
                self.v[state]=new_v
            if delta < 1e-6:
                break
    #策略改善:遍历动作空间,寻找最优动作
    def policy_improve(self,grid_mdp):
        #在每个状态下采用贪婪策略
        for state in grid_mdp.states:
            #终止状态不用计算状态值函数(v=0.0)和求最优策略
            if state in grid_mdp.terminate_states:
                continue
            #假设当前策略为最优动作
            a1=self.pi[state]
            t,s,r=grid_mdp.transform(state,a1)
            #当不在状态转移概率中时,状态动作值函数不存在,状态值函数不变
            #if s!=-1:
            #当前策略下最优状态动作值函数为最优状态值函数
            v1=r+grid_mdp.gamma*self.v[s]
            #遍历动作空间与最优动作进行比较,从而找到最优动作
            for action in grid_mdp.actions:
                #当不在状态转移概率中时,状态动作值函数不存在,状态值函数不变
                t,s,r=grid_mdp.transform(state,action)
                if s!=-1:
                    if v1 < r+grid_mdp.gamma*self.v[s]:
                        a1=action
                        v1=r+grid_mdp.gamma*self.v[s]
            #更新策略
            self.pi[state]=a1
    #最优动作
    def action(self,state):
        return self.pi[state]
gm=env.env
#初始化智能体的状态
state=env.reset()
#实例化对象,获得初始化状态值函数和初始化策略
learn=Learn(gm)
#策略评估和策略改善
learn.policy_iterate(gm)
total_reward=0
#最多走100步到达终止状态
for i in range(100):
    env.render()
    #每个状态的策略都是最优策略
    action=learn.action(state)
    #每一步按照最优策略走
    state,reward,done,_=env.step(action)
    total_reward+=reward 
    time.sleep(1)
    if done:
        #显示环境中物体进入终止状态的图像
        env.render()
        break


由于机器人每一步采取的都是最优策略,因此我们会看到机器人以最短时间寻找到了金币,具体gif动图就不插入了,大家可以运行上述代码看到最终效果,这就是通过策略迭代算法实现机器人寻找金币的案例!


相关文章
|
15天前
|
存储 算法 调度
【复现】【遗传算法】考虑储能和可再生能源消纳责任制的售电公司购售电策略(Python代码实现)
【复现】【遗传算法】考虑储能和可再生能源消纳责任制的售电公司购售电策略(Python代码实现)
121 26
|
2月前
|
机器学习/深度学习 算法 文件存储
神经架构搜索NAS详解:三种核心算法原理与Python实战代码
神经架构搜索(NAS)正被广泛应用于大模型及语言/视觉模型设计,如LangVision-LoRA-NAS、Jet-Nemotron等。本文回顾NAS核心技术,解析其自动化设计原理,探讨强化学习、进化算法与梯度方法的应用与差异,揭示NAS在大模型时代的潜力与挑战。
322 6
神经架构搜索NAS详解:三种核心算法原理与Python实战代码
|
20天前
|
存储 并行计算 算法
【动态多目标优化算法】基于自适应启动策略的混合交叉动态约束多目标优化算法(MC-DCMOEA)求解CEC2023研究(Matlab代码实现)
【动态多目标优化算法】基于自适应启动策略的混合交叉动态约束多目标优化算法(MC-DCMOEA)求解CEC2023研究(Matlab代码实现)
|
23天前
|
机器学习/深度学习 资源调度 算法
遗传算法模型深度解析与实战应用
摘要 遗传算法(GA)作为一种受生物进化启发的优化算法,在复杂问题求解中展现出独特优势。本文系统介绍了GA的核心理论、实现细节和应用经验。算法通过模拟自然选择机制,利用选择、交叉、变异三大操作在解空间中进行全局搜索。与梯度下降等传统方法相比,GA不依赖目标函数的连续性或可微性,特别适合处理离散优化、多目标优化等复杂问题。文中详细阐述了染色体编码、适应度函数设计、遗传操作实现等关键技术,并提供了Python代码实现示例。实践表明,GA的成功应用关键在于平衡探索与开发,通过精心调参维持种群多样性同时确保收敛效率
|
23天前
|
机器学习/深度学习 边缘计算 人工智能
粒子群算法模型深度解析与实战应用
蒋星熠Jaxonic是一位深耕智能优化算法领域多年的技术探索者,专注于粒子群优化(PSO)算法的研究与应用。他深入剖析了PSO的数学模型、核心公式及实现方法,并通过大量实践验证了其在神经网络优化、工程设计等复杂问题上的卓越性能。本文全面展示了PSO的理论基础、改进策略与前沿发展方向,为读者提供了一份详尽的技术指南。
粒子群算法模型深度解析与实战应用
|
2月前
|
机器学习/深度学习 算法 数据可视化
近端策略优化算法PPO的核心概念和PyTorch实现详解
本文深入解析了近端策略优化(PPO)算法的核心原理,并基于PyTorch框架实现了完整的强化学习训练流程。通过Lunar Lander环境展示了算法的全过程,涵盖环境交互、优势函数计算、策略更新等关键模块。内容理论与实践结合,适合希望掌握PPO算法及其实现的读者。
308 2
近端策略优化算法PPO的核心概念和PyTorch实现详解
|
1月前
|
运维 算法 安全
基于变异粒子群算法的主动配电网故障恢复策略(Matlab代码实现)
基于变异粒子群算法的主动配电网故障恢复策略(Matlab代码实现)
|
2月前
|
传感器 算法 安全
【路径规划】基于matlab A_Star结合DWA算法电气设备巡检机器人路径规划研究(Matlab代码实现)
【路径规划】基于matlab A_Star结合DWA算法电气设备巡检机器人路径规划研究(Matlab代码实现)
|
3月前
|
存储 监控 算法
基于 Python 跳表算法的局域网网络监控软件动态数据索引优化策略研究
局域网网络监控软件需高效处理终端行为数据,跳表作为一种基于概率平衡的动态数据结构,具备高效的插入、删除与查询性能(平均时间复杂度为O(log n)),适用于高频数据写入和随机查询场景。本文深入解析跳表原理,探讨其在局域网监控中的适配性,并提供基于Python的完整实现方案,优化终端会话管理,提升系统响应性能。
87 4
|
4月前
|
机器学习/深度学习 算法 数据可视化
基于Qlearning强化学习的机器人迷宫路线搜索算法matlab仿真
本内容展示了基于Q-learning算法的机器人迷宫路径搜索仿真及其实现过程。通过Matlab2022a进行仿真,结果以图形形式呈现,无水印(附图1-4)。算法理论部分介绍了Q-learning的核心概念,包括智能体、环境、状态、动作和奖励,以及Q表的构建与更新方法。具体实现中,将迷宫抽象为二维网格世界,定义起点和终点,利用Q-learning训练机器人找到最优路径。核心程序代码实现了多轮训练、累计奖励值与Q值的可视化,并展示了机器人从起点到终点的路径规划过程。
133 0

热门文章

最新文章