一、构建机器人寻找金币的环境
构建机器人寻找金币的环境需要编写四个主要函数:
reset():利用均匀随机分布初始化智能体的状态;
render():扮演图像引擎的角色,渲染,显示环境中物体的图像;
step():扮演物理引擎的角色,模拟环境中物体的运动规律;
transform():状态转换,获得下一个状态,立即汇报,是否终止和调试项
在编写render()时,我们可以通过gym库中的classic_control目录下的redering模块绘制环境,主要利用rendering模块中的make_circle()、Line()、Transfrom()等函数或类绘制环境,代码如下:
import logging import random import numpy as np import gym import ptvsd ptvsd.enable_attach(address = ('0.0.0.0', 5678)) ptvsd.wait_for_attach() logger=logging.getLogger(__name__) class GridEnv(gym.Env): metadata={ 'render.modes':['human','rgb_array'], 'video.frames_per_second':2 } def __init__(self): #机器人初始化状态 self.x=[140,220,300,380,460,140,300,460] self.y=[250,250,250,250,250,150,150,150] #终止状态 self.terminate_states=dict() self.terminate_states[6]=1 self.terminate_states[7]=1 self.terminate_states[8]=1 #状态空间 self.states=[1,2,3,4,5,6,7,8] #动作空间 self.actions=['n','e','w','s'] #回报函数 self.rewards=dict() self.rewards['1_s']=-1.0 self.rewards['3_s']=1.0 self.rewards['5_s']=-1.0 #状态转移概率 self.t=dict() self.t['1_e']=2 self.t['1_s']=6 self.t['2_w']=1 self.t['2_e']=3 self.t['3_w']=2 self.t['3_e']=4 self.t['3_s']=7 self.t['4_w']=3 self.t['4_e']=5 self.t['5_w']=4 self.t['5_s']=8 #折扣因子 self.gamma=0.8 #显示器 self.viewer=None #状态 self.state=None #在类对象内部访问实例属性 #获取终止状态 #返回下一步状态、立即回报和状态转移概率 def transform(self,state,action): #遍历动作空间,当不在状态转移概率中时,该状态设为-1 s=-1 r=0 key='%i_%s'%(state,action) if key in self.rewards: r=self.rewards[key] if key in self.t: s=self.t[key] return self.t,s,r def getTerminal(self): return self.terminate_states #获取状态空间 def getStates(self): return self.states #获取动作空间 def getActions(self): return self.actions #获取折扣因子 def getGamma(self): return self.gamma #定义reset()函数 def reset(self): self.state=self.states[int(random.random()*len(self.states))] print("hello world") return self.state #定义step():扮演物理引擎的角色,物理引模拟环境中物体的运动规律 def step(self,action): #系统当前状态 state=self.state #判断当前状态是否处于终止状态 if state in self.terminate_states: return state,0,True,{} #'定义的格式化字符串'%实际值 #当定义的格式化字符串中包含两个以上占位符时,必须将所有实际值封装在元组中 key='%i_%s'%(state,action) #状态转移 if key in self.t: next_state=self.t[key] else: next_state=state #系统当前状态 self.state=next_state is_terminal=False if next_state in self.terminate_states: is_terminal=True if key not in self.rewards: r=0.0 else: r=self.rewards[key] return next_state,r,is_terminal,{} #定义render():扮演图像引擎的角色,图像引擎显示环境中物体的图像 def render(self,mode='human',close=False): if close==True: if self.viewer is not None: self.viewer.close() self.viewer=None return screen_width=600 screen_height=400 if self.viewer is None: from gym.envs.classic_control import rendering self.viewer=rendering.Viewer(screen_width,screen_height) #创建网格世界,一共11条直线 self.line1=rendering.Line((100,300),(500,300)) self.line2=rendering.Line((100,200),(500,200)) self.line3=rendering.Line((100,100),(100,300)) self.line4=rendering.Line((180,100),(180,300)) self.line5=rendering.Line((260,100),(260,300)) self.line6=rendering.Line((340,100),(340,300)) self.line7=rendering.Line((420,100),(420,300)) self.line8=rendering.Line((500,100),(500,300)) self.line9=rendering.Line((100,100),(180,100)) self.line10=rendering.Line((260,100),(340,100)) self.line11=rendering.Line((420,100),(500,100)) #创建死亡区域 #画圆,半径为40 self.kulo1=rendering.make_circle(40) #圆心为(140,150) #创建第一个骷髅 self.circletrans=rendering.Transform((140,150)) self.kulo1.add_attr(self.circletrans) self.kulo1.set_color(0,0,0) #创建第二个骷髅 self.kulo2=rendering.make_circle(40) self.circletrans=rendering.Transform((460,150)) self.kulo2.add_attr(self.circletrans) self.kulo2.set_color(0,0,0) #创建金币区域 self.gold=rendering.make_circle(40) self.circletrans=rendering.Transform((300,150)) self.gold.add_attr(self.circletrans) self.gold.set_color(1,0.9,0) #创建机器人 self.robot=rendering.make_circle(30) self.robotrans=rendering.Transform() self.robot.add_attr(self.robotrans) self.robot.set_color(0.8,0.6,0.4) #设置颜色并将对象添加到几何中 self.line1.set_color(0,0,0) self.line2.set_color(0,0,0) self.line3.set_color(0,0,0) self.line4.set_color(0,0,0) self.line5.set_color(0,0,0) self.line6.set_color(0,0,0) self.line7.set_color(0,0,0) self.line8.set_color(0,0,0) self.line9.set_color(0,0,0) self.line10.set_color(0,0,0) self.line11.set_color(0,0,0) self.viewer.add_geom(self.line1) self.viewer.add_geom(self.line2) self.viewer.add_geom(self.line3) self.viewer.add_geom(self.line4) self.viewer.add_geom(self.line5) self.viewer.add_geom(self.line6) self.viewer.add_geom(self.line7) self.viewer.add_geom(self.line8) self.viewer.add_geom(self.line9) self.viewer.add_geom(self.line10) self.viewer.add_geom(self.line11) self.viewer.add_geom(self.kulo1) self.viewer.add_geom(self.kulo2) self.viewer.add_geom(self.gold) self.viewer.add_geom(self.robot) if self.state is None: return None #设置机器人圆心坐标 self.robotrans.set_translation(self.x[self.state-1],self.y[self.state-1]) return self.viewer.render('rgb_array')
二、测试构建好的环境:
当环境构建好以后需要注册环境,以便通过gym的标准形式调用该环境。环境注册需要三步:
1、将环境文件拷贝到gym安装目录gym/gym/envs/classic_control目录中,拷贝到此目录中的原因是要使用rendering模块绘制环境图像;
2、打开此目录中的初始化文件__init__.py,在文件末尾插入语句:
from gym.envs.classic_control.grid_mdp GridEnv
3、进入上一级目录gym/gym/envs中,打开此目录中的初始化文件__init__.py,在文件末尾插入语句:
register ( id='GridWorld-v0', entry_point='gym.envs.classic_control:GridEnv', max_episode_steps=200, reward_threshold=100.0, )
环境构建好以后,我们就来测试一下环境。
测试代码如下所示:
import gym import time env=gym.make('GridWorld-v0') env.reset() env.render() time.sleep(3)
运行测试程序看到如下环境证明环境构建成功:
PS:环境中黑色圆代表陷阱,蓝色圆代表金币,绿色圆代表机器人。
三、策略迭代算法寻找最优策略:
现在,我们需要让机器人在此环境中以最快的速度寻找到金币,也就是寻找一个最优策略使得机器人寻找到金币的用时最短,而最优策略就是一组动作,且每一个动作在当前状态下都是最优动作,即当前状态下的最优状态动作值函数所对应的动作。由于机器人寻找金币的问题是一个马尔可夫决策过程问题,而马尔可夫决策过程问题符合使用动态
规划求解问题的两个条件:
1、整个优化问题可以分解为多个子优化问题:在MDP中,最优策略可以分解为一组最优动作;
2、子优化问题的解可以被存储和重复利用:在MDP中,最优动作的解为最优状态动作值函数,而最优状态动作值函数等于当前状态下的最优值函数;
因此,可以使用动态规划求解MDP问题,而动态规划有两种方法:
策略迭代算法:包含策略评估和策略改善。
在策略评估中,迭代计算每个状态的状态值函数直到达到当前策略的真实状态值函数,目的是为了更好的评估当前策略的价值;
在策略改善中,在每个状态下采用贪婪策略(确定性策略)以更新当前策略。
策略评估和策略改善交替进行,直到策略不变为止。
值迭代算法:包含策略评估和策略改善。
在策略评估中只迭代一次,计算初始策略下每个状态的状态值函数。
在策略改善中,在每个状态下采用贪婪策略,将最优状态动作值函数作为当前状态的状态值函数,因此策略改善迭代的是状态值函数,直到状态值函数不变为止。
值迭代算法策略评估和策略改善没有交替进行,只是循环策略改善部分迭代状态值函数。
由于策略迭代算法比值迭代算法迭代收敛速度更快,因此本例采用策略迭代算法寻找最优策略。
代码如下所示(代码中每行都有注释,以方便大家理解):
/
import gym import random import time env=gym.make('GridWorld-v0') class Learn: def __init__(self,grid_mdp): #初始化状态值函数 self.v=dict() for state in grid_mdp.states: self.v[state]=0 #初始化策略,这些策略均在状态转移概率矩阵中 self.pi=dict() #random.choice(seq):返回列表、元组、字符串的随机项 self.pi[1]=random.choice(['e','s']) self.pi[2]=random.choice(['e','w']) self.pi[3]=random.choice(['w','s','e']) self.pi[4]=random.choice(['e','w']) self.pi[5]=random.choice(['w','s']) #策略迭代函数 def policy_iterate(self,grid_mdp): #迭代100次直到策略不变为止 for i in range(100): #策略评估和策略改善交替进行 self.policy_evaluate(grid_mdp) self.policy_improve(grid_mdp) #策略评估: def policy_evaluate(self,grid_mdp): #迭代1000次计算每个状态的真实状态值函数 for i in range(1000): delta=0.0 #遍历状态空间 for state in grid_mdp.states: #终止状态不用计算状态值函数(v=0.0) if state in grid_mdp.terminate_states: continue action=self.pi[state] t,s,r=grid_mdp.transform(state,action) #if s!=-1: new_v=r+grid_mdp.gamma*self.v[s] delta+=abs(new_v-self.v[state]) #更新状态值函数 self.v[state]=new_v if delta < 1e-6: break #策略改善:遍历动作空间,寻找最优动作 def policy_improve(self,grid_mdp): #在每个状态下采用贪婪策略 for state in grid_mdp.states: #终止状态不用计算状态值函数(v=0.0)和求最优策略 if state in grid_mdp.terminate_states: continue #假设当前策略为最优动作 a1=self.pi[state] t,s,r=grid_mdp.transform(state,a1) #当不在状态转移概率中时,状态动作值函数不存在,状态值函数不变 #if s!=-1: #当前策略下最优状态动作值函数为最优状态值函数 v1=r+grid_mdp.gamma*self.v[s] #遍历动作空间与最优动作进行比较,从而找到最优动作 for action in grid_mdp.actions: #当不在状态转移概率中时,状态动作值函数不存在,状态值函数不变 t,s,r=grid_mdp.transform(state,action) if s!=-1: if v1 < r+grid_mdp.gamma*self.v[s]: a1=action v1=r+grid_mdp.gamma*self.v[s] #更新策略 self.pi[state]=a1 #最优动作 def action(self,state): return self.pi[state] gm=env.env #初始化智能体的状态 state=env.reset() #实例化对象,获得初始化状态值函数和初始化策略 learn=Learn(gm) #策略评估和策略改善 learn.policy_iterate(gm) total_reward=0 #最多走100步到达终止状态 for i in range(100): env.render() #每个状态的策略都是最优策略 action=learn.action(state) #每一步按照最优策略走 state,reward,done,_=env.step(action) total_reward+=reward time.sleep(1) if done: #显示环境中物体进入终止状态的图像 env.render() break
由于机器人每一步采取的都是最优策略,因此我们会看到机器人以最短时间寻找到了金币,具体gif动图就不插入了,大家可以运行上述代码看到最终效果,这就是通过策略迭代算法实现机器人寻找金币的案例!