强化学习实战(一):策略迭代算法实现机器人快速寻找金币

简介: 强化学习实战(一):策略迭代算法实现机器人快速寻找金币

一、构建机器人寻找金币的环境


构建机器人寻找金币的环境需要编写四个主要函数:


reset():利用均匀随机分布初始化智能体的状态;


render():扮演图像引擎的角色,渲染,显示环境中物体的图像;


step():扮演物理引擎的角色,模拟环境中物体的运动规律;


transform():状态转换,获得下一个状态,立即汇报,是否终止和调试项


在编写render()时,我们可以通过gym库中的classic_control目录下的redering模块绘制环境,主要利用rendering模块中的make_circle()、Line()、Transfrom()等函数或类绘制环境,代码如下:


import logging
import random
import numpy as np  
import gym 
import ptvsd
ptvsd.enable_attach(address = ('0.0.0.0', 5678))
ptvsd.wait_for_attach()
logger=logging.getLogger(__name__)
class GridEnv(gym.Env):
    metadata={
        'render.modes':['human','rgb_array'],
        'video.frames_per_second':2
    }
    def __init__(self):
        #机器人初始化状态
        self.x=[140,220,300,380,460,140,300,460]
        self.y=[250,250,250,250,250,150,150,150]
        #终止状态
        self.terminate_states=dict()
        self.terminate_states[6]=1
        self.terminate_states[7]=1
        self.terminate_states[8]=1
        #状态空间
        self.states=[1,2,3,4,5,6,7,8]
        #动作空间
        self.actions=['n','e','w','s']
        #回报函数
        self.rewards=dict()
        self.rewards['1_s']=-1.0
        self.rewards['3_s']=1.0
        self.rewards['5_s']=-1.0
        #状态转移概率
        self.t=dict()
        self.t['1_e']=2
        self.t['1_s']=6
        self.t['2_w']=1
        self.t['2_e']=3
        self.t['3_w']=2
        self.t['3_e']=4
        self.t['3_s']=7
        self.t['4_w']=3
        self.t['4_e']=5
        self.t['5_w']=4
        self.t['5_s']=8
        #折扣因子
        self.gamma=0.8
        #显示器
        self.viewer=None
        #状态
        self.state=None
        #在类对象内部访问实例属性
        #获取终止状态
    #返回下一步状态、立即回报和状态转移概率
    def transform(self,state,action):
        #遍历动作空间,当不在状态转移概率中时,该状态设为-1
        s=-1
        r=0
        key='%i_%s'%(state,action)
        if key in self.rewards:
            r=self.rewards[key]
        if key in self.t:
            s=self.t[key]
        return self.t,s,r
    def getTerminal(self):
        return self.terminate_states
    #获取状态空间
    def getStates(self):
        return self.states
    #获取动作空间
    def getActions(self):
        return self.actions
    #获取折扣因子
    def getGamma(self):
        return self.gamma
    #定义reset()函数
    def reset(self):
        self.state=self.states[int(random.random()*len(self.states))]
        print("hello world")
        return self.state
    #定义step():扮演物理引擎的角色,物理引模拟环境中物体的运动规律
    def step(self,action):
        #系统当前状态
        state=self.state
        #判断当前状态是否处于终止状态
        if state in self.terminate_states:
            return state,0,True,{}
        #'定义的格式化字符串'%实际值
        #当定义的格式化字符串中包含两个以上占位符时,必须将所有实际值封装在元组中
        key='%i_%s'%(state,action)
        #状态转移
        if key in self.t:
            next_state=self.t[key]
        else:
            next_state=state
        #系统当前状态
        self.state=next_state
        is_terminal=False
        if next_state in self.terminate_states:
            is_terminal=True
        if key not in self.rewards:
            r=0.0
        else:
            r=self.rewards[key]
        return next_state,r,is_terminal,{}
    #定义render():扮演图像引擎的角色,图像引擎显示环境中物体的图像
    def render(self,mode='human',close=False):
        if close==True:
            if self.viewer is not None:
                self.viewer.close()
                self.viewer=None
            return
        screen_width=600
        screen_height=400
        if self.viewer is None:
            from gym.envs.classic_control import rendering
            self.viewer=rendering.Viewer(screen_width,screen_height)
            #创建网格世界,一共11条直线
            self.line1=rendering.Line((100,300),(500,300))
            self.line2=rendering.Line((100,200),(500,200))
            self.line3=rendering.Line((100,100),(100,300))
            self.line4=rendering.Line((180,100),(180,300))
            self.line5=rendering.Line((260,100),(260,300))
            self.line6=rendering.Line((340,100),(340,300))
            self.line7=rendering.Line((420,100),(420,300))
            self.line8=rendering.Line((500,100),(500,300))
            self.line9=rendering.Line((100,100),(180,100))
            self.line10=rendering.Line((260,100),(340,100))
            self.line11=rendering.Line((420,100),(500,100))
            #创建死亡区域
            #画圆,半径为40
            self.kulo1=rendering.make_circle(40)
            #圆心为(140,150)
            #创建第一个骷髅
            self.circletrans=rendering.Transform((140,150))
            self.kulo1.add_attr(self.circletrans)
            self.kulo1.set_color(0,0,0)
            #创建第二个骷髅
            self.kulo2=rendering.make_circle(40)
            self.circletrans=rendering.Transform((460,150))
            self.kulo2.add_attr(self.circletrans)
            self.kulo2.set_color(0,0,0)
            #创建金币区域
            self.gold=rendering.make_circle(40)
            self.circletrans=rendering.Transform((300,150))
            self.gold.add_attr(self.circletrans)
            self.gold.set_color(1,0.9,0)
            #创建机器人
            self.robot=rendering.make_circle(30)
            self.robotrans=rendering.Transform()
            self.robot.add_attr(self.robotrans)
            self.robot.set_color(0.8,0.6,0.4)
            #设置颜色并将对象添加到几何中
            self.line1.set_color(0,0,0)
            self.line2.set_color(0,0,0)
            self.line3.set_color(0,0,0)
            self.line4.set_color(0,0,0)
            self.line5.set_color(0,0,0)
            self.line6.set_color(0,0,0)
            self.line7.set_color(0,0,0)
            self.line8.set_color(0,0,0)
            self.line9.set_color(0,0,0)
            self.line10.set_color(0,0,0)
            self.line11.set_color(0,0,0)
            self.viewer.add_geom(self.line1)
            self.viewer.add_geom(self.line2)
            self.viewer.add_geom(self.line3)
            self.viewer.add_geom(self.line4)
            self.viewer.add_geom(self.line5)
            self.viewer.add_geom(self.line6)
            self.viewer.add_geom(self.line7)
            self.viewer.add_geom(self.line8)
            self.viewer.add_geom(self.line9)
            self.viewer.add_geom(self.line10)
            self.viewer.add_geom(self.line11)
            self.viewer.add_geom(self.kulo1)
            self.viewer.add_geom(self.kulo2)
            self.viewer.add_geom(self.gold)
            self.viewer.add_geom(self.robot)
        if self.state is None:
            return None
        #设置机器人圆心坐标
        self.robotrans.set_translation(self.x[self.state-1],self.y[self.state-1])
        return self.viewer.render('rgb_array')


二、测试构建好的环境:


当环境构建好以后需要注册环境,以便通过gym的标准形式调用该环境。环境注册需要三步:


1、将环境文件拷贝到gym安装目录gym/gym/envs/classic_control目录中,拷贝到此目录中的原因是要使用rendering模块绘制环境图像;


2、打开此目录中的初始化文件__init__.py,在文件末尾插入语句:


from gym.envs.classic_control.grid_mdp GridEnv


3、进入上一级目录gym/gym/envs中,打开此目录中的初始化文件__init__.py,在文件末尾插入语句:

register (
    id='GridWorld-v0',  
    entry_point='gym.envs.classic_control:GridEnv',
    max_episode_steps=200,
    reward_threshold=100.0,
)


环境构建好以后,我们就来测试一下环境。


测试代码如下所示:


import gym
import time
env=gym.make('GridWorld-v0')
env.reset()
env.render()
time.sleep(3)


运行测试程序看到如下环境证明环境构建成功:


ab718343cb58702ac176dd47c3cac4f.png


PS:环境中黑色圆代表陷阱,蓝色圆代表金币,绿色圆代表机器人。


三、策略迭代算法寻找最优策略:


现在,我们需要让机器人在此环境中以最快的速度寻找到金币,也就是寻找一个最优策略使得机器人寻找到金币的用时最短,而最优策略就是一组动作,且每一个动作在当前状态下都是最优动作,即当前状态下的最优状态动作值函数所对应的动作。由于机器人寻找金币的问题是一个马尔可夫决策过程问题,而马尔可夫决策过程问题符合使用动态


规划求解问题的两个条件:


1、整个优化问题可以分解为多个子优化问题:在MDP中,最优策略可以分解为一组最优动作;


2、子优化问题的解可以被存储和重复利用:在MDP中,最优动作的解为最优状态动作值函数,而最优状态动作值函数等于当前状态下的最优值函数;


因此,可以使用动态规划求解MDP问题,而动态规划有两种方法:



策略迭代算法:包含策略评估和策略改善。


在策略评估中,迭代计算每个状态的状态值函数直到达到当前策略的真实状态值函数,目的是为了更好的评估当前策略的价值;


在策略改善中,在每个状态下采用贪婪策略(确定性策略)以更新当前策略。


策略评估和策略改善交替进行,直到策略不变为止。


值迭代算法:包含策略评估和策略改善。


在策略评估中只迭代一次,计算初始策略下每个状态的状态值函数。


在策略改善中,在每个状态下采用贪婪策略,将最优状态动作值函数作为当前状态的状态值函数,因此策略改善迭代的是状态值函数,直到状态值函数不变为止。


值迭代算法策略评估和策略改善没有交替进行,只是循环策略改善部分迭代状态值函数。


由于策略迭代算法比值迭代算法迭代收敛速度更快,因此本例采用策略迭代算法寻找最优策略。


代码如下所示(代码中每行都有注释,以方便大家理解):


/

import gym 
import random
import time
env=gym.make('GridWorld-v0')
class Learn:
    def __init__(self,grid_mdp):
        #初始化状态值函数
        self.v=dict()
        for state in grid_mdp.states:
            self.v[state]=0
        #初始化策略,这些策略均在状态转移概率矩阵中
        self.pi=dict()
        #random.choice(seq):返回列表、元组、字符串的随机项
        self.pi[1]=random.choice(['e','s'])
        self.pi[2]=random.choice(['e','w'])
        self.pi[3]=random.choice(['w','s','e'])
        self.pi[4]=random.choice(['e','w'])
        self.pi[5]=random.choice(['w','s'])
    #策略迭代函数
    def policy_iterate(self,grid_mdp):
        #迭代100次直到策略不变为止
        for i in range(100):
            #策略评估和策略改善交替进行
            self.policy_evaluate(grid_mdp)
            self.policy_improve(grid_mdp)
    #策略评估:
    def policy_evaluate(self,grid_mdp):
        #迭代1000次计算每个状态的真实状态值函数
        for i in range(1000):
            delta=0.0
            #遍历状态空间
            for state in grid_mdp.states:
                #终止状态不用计算状态值函数(v=0.0)
                if state in grid_mdp.terminate_states:
                    continue
                action=self.pi[state]
                t,s,r=grid_mdp.transform(state,action)
                #if s!=-1:
                new_v=r+grid_mdp.gamma*self.v[s]
                delta+=abs(new_v-self.v[state])
                #更新状态值函数
                self.v[state]=new_v
            if delta < 1e-6:
                break
    #策略改善:遍历动作空间,寻找最优动作
    def policy_improve(self,grid_mdp):
        #在每个状态下采用贪婪策略
        for state in grid_mdp.states:
            #终止状态不用计算状态值函数(v=0.0)和求最优策略
            if state in grid_mdp.terminate_states:
                continue
            #假设当前策略为最优动作
            a1=self.pi[state]
            t,s,r=grid_mdp.transform(state,a1)
            #当不在状态转移概率中时,状态动作值函数不存在,状态值函数不变
            #if s!=-1:
            #当前策略下最优状态动作值函数为最优状态值函数
            v1=r+grid_mdp.gamma*self.v[s]
            #遍历动作空间与最优动作进行比较,从而找到最优动作
            for action in grid_mdp.actions:
                #当不在状态转移概率中时,状态动作值函数不存在,状态值函数不变
                t,s,r=grid_mdp.transform(state,action)
                if s!=-1:
                    if v1 < r+grid_mdp.gamma*self.v[s]:
                        a1=action
                        v1=r+grid_mdp.gamma*self.v[s]
            #更新策略
            self.pi[state]=a1
    #最优动作
    def action(self,state):
        return self.pi[state]
gm=env.env
#初始化智能体的状态
state=env.reset()
#实例化对象,获得初始化状态值函数和初始化策略
learn=Learn(gm)
#策略评估和策略改善
learn.policy_iterate(gm)
total_reward=0
#最多走100步到达终止状态
for i in range(100):
    env.render()
    #每个状态的策略都是最优策略
    action=learn.action(state)
    #每一步按照最优策略走
    state,reward,done,_=env.step(action)
    total_reward+=reward 
    time.sleep(1)
    if done:
        #显示环境中物体进入终止状态的图像
        env.render()
        break


由于机器人每一步采取的都是最优策略,因此我们会看到机器人以最短时间寻找到了金币,具体gif动图就不插入了,大家可以运行上述代码看到最终效果,这就是通过策略迭代算法实现机器人寻找金币的案例!


相关文章
|
1月前
|
机器学习/深度学习 算法 机器人
多代理强化学习综述:原理、算法与挑战
多代理强化学习是强化学习的一个子领域,专注于研究在共享环境中共存的多个学习代理的行为。每个代理都受其个体奖励驱动,采取行动以推进自身利益;在某些环境中,这些利益可能与其他代理的利益相冲突,从而产生复杂的群体动态。
181 5
|
14天前
|
机器学习/深度学习 监控 机器人
量化交易机器人系统开发逻辑策略及源码示例
量化交易机器人是一种通过编程实现自动化交易决策的金融工具。其开发流程包括需求分析、系统设计、开发实现、测试优化、部署上线、风险管理及数据分析。示例中展示了使用Python实现的简单双均线策略,计算交易信号并输出累计收益率。
|
13天前
|
机器学习/深度学习 监控 算法
现货量化交易机器人系统开发策略逻辑及源码示例
现货量化交易机器人系统是一种基于计算机算法和数据分析的自动化交易工具。该系统通过制定交易策略、获取和处理数据、生成交易信号、执行交易操作和控制风险等环节,实现高效、精准的交易决策。系统架构可采用分布式或集中式,以满足不同需求。文中还提供了一个简单的双均线策略Python代码示例。
|
27天前
|
数据采集 缓存 算法
算法优化的常见策略有哪些
【10月更文挑战第20天】算法优化的常见策略有哪些
|
2月前
|
资源调度 算法
基于迭代扩展卡尔曼滤波算法的倒立摆控制系统matlab仿真
本课题研究基于迭代扩展卡尔曼滤波算法的倒立摆控制系统,并对比UKF、EKF、迭代UKF和迭代EKF的控制效果。倒立摆作为典型的非线性系统,适用于评估不同滤波方法的性能。UKF采用无迹变换逼近非线性函数,避免了EKF中的截断误差;EKF则通过泰勒级数展开近似非线性函数;迭代EKF和迭代UKF通过多次迭代提高状态估计精度。系统使用MATLAB 2022a进行仿真和分析,结果显示UKF和迭代UKF在非线性强的系统中表现更佳,但计算复杂度较高;EKF和迭代EKF则更适合维数较高或计算受限的场景。
|
3月前
|
机器学习/深度学习 算法 TensorFlow
深入探索强化学习与深度学习的融合:使用TensorFlow框架实现深度Q网络算法及高效调试技巧
【8月更文挑战第31天】强化学习是机器学习的重要分支,尤其在深度学习的推动下,能够解决更为复杂的问题。深度Q网络(DQN)结合了深度学习与强化学习的优势,通过神经网络逼近动作价值函数,在多种任务中表现出色。本文探讨了使用TensorFlow实现DQN算法的方法及其调试技巧。DQN通过神经网络学习不同状态下采取动作的预期回报Q(s,a),处理高维状态空间。
55 1
|
3月前
|
机器学习/深度学习 存储 算法
强化学习实战:基于 PyTorch 的环境搭建与算法实现
【8月更文第29天】强化学习是机器学习的一个重要分支,它让智能体通过与环境交互来学习策略,以最大化长期奖励。本文将介绍如何使用PyTorch实现两种经典的强化学习算法——Deep Q-Network (DQN) 和 Actor-Critic Algorithm with Asynchronous Advantage (A3C)。我们将从环境搭建开始,逐步实现算法的核心部分,并给出完整的代码示例。
257 1
|
3月前
|
缓存 算法 前端开发
深入理解缓存淘汰策略:LRU和LFU算法的解析与应用
【8月更文挑战第25天】在计算机科学领域,高效管理资源对于提升系统性能至关重要。内存缓存作为一种加速数据读取的有效方法,其管理策略直接影响整体性能。本文重点介绍两种常用的缓存淘汰算法:LRU(最近最少使用)和LFU(最不经常使用)。LRU算法依据数据最近是否被访问来进行淘汰决策;而LFU算法则根据数据的访问频率做出判断。这两种算法各有特点,适用于不同的应用场景。通过深入分析这两种算法的原理、实现方式及适用场景,本文旨在帮助开发者更好地理解缓存管理机制,从而在实际应用中作出更合理的选择,有效提升系统性能和用户体验。
191 1
|
3月前
|
算法 语音技术
支付宝商业化广告算法问题之在ODL模型优化过程中,采取什么策略来提高模型的泛化能力呢
支付宝商业化广告算法问题之在ODL模型优化过程中,采取什么策略来提高模型的泛化能力呢
|
3月前
|
测试技术 数据库
探索JSF单元测试秘籍!如何让您的应用更稳固、更高效?揭秘成功背后的测试之道!
【8月更文挑战第31天】在 JavaServer Faces(JSF)应用开发中,确保代码质量和可维护性至关重要。本文详细介绍了如何通过单元测试实现这一目标。首先,阐述了单元测试的重要性及其对应用稳定性的影响;其次,提出了提高 JSF 应用可测试性的设计建议,如避免直接访问外部资源和使用依赖注入;最后,通过一个具体的 `UserBean` 示例,展示了如何利用 JUnit 和 Mockito 框架编写有效的单元测试。通过这些方法,不仅能够确保代码质量,还能提高开发效率和降低维护成本。
52 0

热门文章

最新文章