动态规划法和策略迭代在扫地机器人中确定状态值和动作值函数的策略评估(python实现 附源码 超详细)

简介: 动态规划法和策略迭代在扫地机器人中确定状态值和动作值函数的策略评估(python实现 附源码 超详细)

觉得有帮助或需要源码请点赞关注收藏后评论区留言或私信博主要

在强化学习中,动态规划法主要用于求解有模型的MDP问题,尽管在现实任务中难以获得完备的环境模型,且动态规划法需要消耗大量的计算资源,但是作为强化学习的基础,动态规划法仍然具有非常重要的理论意义。

动态规划法主要包括基于模型的策略迭代和基于模型的值迭代两种。这两种算法都是利用值函数来评价策略的,一旦计算出满足贝尔曼最优方程的最优状态值函数V或最优动作值函数Q,就能得到最优策略。

策略迭代通过构建策略的值函数(状态值函数v 动作值函数q)来评估当前策略,并利用这些值函数给出改进的新策略,策略跌打由策略评估和策略改进两部分组成。

下面通过扫地机器人在不同环境下的策略评估进行实战

基于状态值函数的策略评估

 

1:确定环境如下

部分代码如下

# 代05-例4.1-基于状态值函数的确定环境扫地机器人任务策略评估
import numpy as np
from 扫地机器人gym环境 import GridWorldEnv
env = GridWorldEnv()
"""定义格子世界参数"""
world_h =  5
world_w = 5
length = world_h * world_w
gamma = 0.8
state = [i for i in range(length)]  # 状态(编号)
action = ['n', 's', 'w', 'e']  # 动作名称
ds_action = {'n': -world_w, 'e': 1, 's': world_w, 'w': -1}
policy = np.zeros([length, len(action)])
suqe=[20, 21, 22, 23, 24, 15, 16, 17, 18, 19, 10, 11, 12, 13, 14, 5, 6, 7, 8, 9, 0, 1, 2, 3,4]
# 定义奖励
def reward(s):
    if s == 20:  # 到充电站
        return 1
    elif s == 12:  # 到陷阱中
        return -10
    elif s == 9:  # 到垃圾处
        return 3
    else:
        return 0  # 其他
def getAction(a):
    if a == 'n':
        return 0
    elif a == 'e':
        return 3
    elif a == 's':
        return 1
    elif a == 'w':
        return 2
入状态,则直接pass不做操作
                continue
            v = 0  # 针对每个状态值函数进行计算
            print("第%d 的状态" % (k), end="")
            for a in action:
                newAction = getAction(a)
                next_state = next_states(s, a)
                rewards = reward(next_state)
                if next_state == 12:
                    v += policy[s][newAction] * (rewards + gamma * V[s])
                    # print(" %.2f*(%d+%.1f*%.3f)+" % (policy[s][newAction], rewards, gamma, V[next_state]), end="")
                    print(" %.2f*(%d+%.1f*%.3f)+" % (policy[s][newAction], rewards, gamma, V[next_state]), end="")
                else:
                    v += policy[s][newAction] * (rewards + gamma * V[next_state])
                    #     print("%.2f*(%d+%.1f*%.2f)+" % (policy[s][newAction], rewards, gamma, value[next_state]), end="")
                    # print()
                    # successor = getsuccessor(s)
                    # for next_state in successor:
                    #     rewards = reward(next_state)
                    #     v += 1 / len(successor) * (rewards + gamma * V[next_state])
                    print(" %.2f*(%d+%.1f*%.3f)+" % (policy[s][newAction], rewards, gamma, V[next_state]), end="")
            print("v = %.3f" % (v))
            delta = max(delta, np.abs(v - V[s]))  # 更新差值
            V[s] = v  # 存储(更新)每个状态下的状态值函数,即伪代码中的 v <- V(s)
        value = np.array(V).reshape(world_h, world_w)
        iter += 1
        print('k=', iter)  # 打印迭代次数
        print("当前的状态值函数为:")
        print(np.round(value, decimals=3))# 输出当前的状态值函数
        if delta < theta:  # 策略评估的迭代次数不能太多,否则状态值函数的数值会越来越大(即使算法仍然在收敛)
            break
    return V  # 一轮迭代结束后,状态值函数暂时固定
initPolicy()
value = policy_eval()

随机环境的状态值函数策略评估此处省略 有需要请点赞关注收藏后私信博主

基于动作值函数的策略评估

部分代码如下

# 代08-例4.4-基于动作值函数的随机环境扫地机器人任务策略评估
import numpy as np
import pandas as pd
"""定义格子世界参数"""
world_h =  5
world_w = 5
length = world_h * world_w
gamma = 0.8
action = ['n', 's', 'w', 'e']  # 动作名称
ds_action = {'n': -world_w, 'e': 1, 's': world_w, 'w': -1}
policy = np.zeros([length, len(action)])
suqe=[20, 21, 22, 23, 24, 15, 16, 17, 18, 19, 10, 11, 12, 13, 14, 5, 6, 7, 8, 9, 0, 1, 2, 3,4]
# 定义奖励
def reward(s):
    if s == 20:  # 到充电站
        return 1
    elif s == 12:  # 到陷阱中
        return -10
    elif s == 9:  # 到垃圾处
        return 3
    else:
        return 0  # 其他
    # in表示0是[*,*,*]中的一个
def getAction(a):
    if a == 'n':
        return 0
    elif a == 'e':
        return 3
    elif a == 's':
        return 1
    elif a == 'w':
        return 2
# 在s状态下执行动作a,返回下一状态(编号)
def next_states(s, a):
    # 越过边界时pass
    if (s < world_w and a == 'n') \
            or (s % world_w == 0 and a == 'w') \
            or (s > length - world_w - 1 and a == 's') \
            or ((s + 1) % world_w == 0 and a == 'e'):  # (s % (world_w - 1) == 0 and a == 'e' and s != 0)
        next_state = s  # 表现为next_state不变
    else:
        next_state = s + ds_action[a]  # 进入下一个状态
    return next_state
# 在s状态下执行动作,返回所有可能的下一状态(编号)list
def getsuccessor(s):
    successor = []
    for a in action:  # 遍历四个动作
        if s == next_states(s, a):
            continue
        else:
            # print("状态s=%s,动作a=%s"%(s,a))
            next = next_states(s, a)  # 得到下一个状态(编号)
        successor.append(next)  # 以list保存当前状态s下执行四个动作的下一状态
    # print(len(successor))
    return successor
def envActionPolicy(a):
    if a == 'n':
        return 's'
    elif a == 's':
        return 'n'
    elif a == 'e':
        return 'w'
    elif a == 'w':
        return 'e'
def CaValue(Q):
    v = [0 for i in range(length)]
    for i in range(length):
        for a in action:
            newAction = getAction(a)
            v[i] += policy[i][newAction] * Q.loc[i, a]
    value = np.array(v).reshape(world_h, world_w)
    print(np.round(value, decimals=4))
def sumQ_nextstate(s, Q, visios):
    sum = 0
    for i in action:
        newAction = getAction(i)
        sum += policy[s][newAction] * Q.loc[s, i]
    return sum
def initPolicy():
    for s in range(length):
        for a in action:
            if next_states(s, a) == s:
                continue
            newAction = getAction(a)
            policy[s][newAction] = 1 / len(getsuccessor(s))
    # print(policy)
def policy_eval_Q_random(theta=0.0001):
    Q = pd.DataFrame(
        np.zeros((length, len(action))),  # q_table initial values
        columns=action,  # actions's name
    )
    iter = 0
    while True:
        k = -1
        delta = 0  # 定义最大差值,判断是否有进行更新
        for s in suqe:  # 遍历所有状态 [0~25]
            visio = False
            k += 1
            if s in [9, 20, 12]:  # 若当前状态为吸入状态,则直接pass不做操作
                continue
            if s == 17:
                visio = True
            # [[-0.7954 - 1.0218 - 1.2655 - 0.1564  1.369]
            #  [-1.066 - 1.9614 - 3.8893 - 0.7455  0.]
            # [-1.4346 - 4.176
            # 0. - 3.5631 - 0.0563]
            # [-0.489 - 1.7904 - 4.1252 - 1.7891 - 0.6118]
            # [0. - 0.4778 - 1.3917 - 0.9611 - 0.5992]]
            for a in action:
                newAction = getAction(a)
                env_action = envActionPolicy(a)
                next_state = next_states(s, a)
                env_state = next_states(s, env_action)
                rewards = reward(next_state)
                env_rewards = reward(env_state)
                if policy[s][newAction] == 0:
                    continue
                if next_state == 12:
                    q = 0.8 * (rewards + gamma * sumQ_nextstate(s, Q, visio)) + 0.15 * (
                                gamma * sumQ_nextstate(s, Q, visio)) + 0.05 * (
                                    env_rewards + gamma * (sumQ_nextstate(env_state, Q, visio)))
                    if visio == True:
                        print("q=%.2f=0.8*(%.2f+%.2f*%.2f)+0.15*(%.2f*%.2f)+0.05*(%.2f+%.2f*%.2f)"
                              % (q, rewards, gamma, sumQ_nextstate(s, Q, visio), gamma, sumQ_nextstate(s, Q, visio),
                                 env_rewards, gamma, sumQ_nextstate(env_state, Q, visio)))
                else:
                    q = 0.8 * (rewards + gamma * sumQ_nextstate(next_state, Q, visio)) + 0.05 * (
                                env_rewards + gamma * sumQ_nextstate(env_state, Q, visio)) \
                        + 0.15 * gamma * sumQ_nextstate(s, Q, visio)
                    if visio == True:
                        print("q=%.2f=0.8*(%.2f+%.2f*%.2f)+0.15*(%.2f*%.2f)+0.05*(%.2f+%.2f*%.2f)" % (q,rewards, gamma,
                                                                                                      sumQ_nextstate(next_state, Q,visio), gamma,
                                                                                                      sumQ_nextstate(s,Q,visio),
                                                                                                      env_rewards,gamma,
                                                                                                      sumQ_nextstate(env_state, Q,visio)))
                delta = max(delta, np.abs(q - Q.loc[s, a]))  # 更新差值
                Q.loc[s, a] = q  # 存储(更新)每个状态下的状态值函数,即伪代码中的 v <- V(s)
        iter += 1
        print('k=', iter)  # 打印迭代次数
        k = 0
        Q1 = pd.DataFrame(
            np.zeros((length, len(action))),  # q_table initial values
            columns=action,  # actions's name
        )
        for s in suqe:
            Q1.loc[k] = Q.loc[s]
            k = k + 1
        Q1.rename(columns={'n': 'UP', 's': 'DOWN', 'w': 'LEFT', 'e': 'RIGHT'}, inplace=True)
        print(Q1)
        if delta < theta:  # 策略评估的迭代次数不能太多,否则状态值函数的数值会越来越大(即使算法仍然在收敛)
            break
        # CaValue(Q)
    return Q
initPolicy()
q = policy_eval_Q_random()
CaValue(q)

确定环境下的省略不表 有需要请点赞关注收藏后私信博主要

相关文章
|
2月前
|
搜索推荐 Python
利用Python内置函数实现的冒泡排序算法
在上述代码中,`bubble_sort` 函数接受一个列表 `arr` 作为输入。通过两层循环,外层循环控制排序的轮数,内层循环用于比较相邻的元素并进行交换。如果前一个元素大于后一个元素,就将它们交换位置。
152 67
|
22天前
|
Python
[oeasy]python057_如何删除print函数_dunder_builtins_系统内建模块
本文介绍了如何删除Python中的`print`函数,并探讨了系统内建模块`__builtins__`的作用。主要内容包括: 1. **回忆上次内容**:上次提到使用下划线避免命名冲突。 2. **双下划线变量**:解释了双下划线(如`__name__`、`__doc__`、`__builtins__`)是系统定义的标识符,具有特殊含义。
27 3
|
25天前
|
JSON 监控 安全
深入理解 Python 的 eval() 函数与空全局字典 {}
`eval()` 函数在 Python 中能将字符串解析为代码并执行,但伴随安全风险,尤其在处理不受信任的输入时。传递空全局字典 {} 可限制其访问内置对象,但仍存隐患。建议通过限制函数和变量、使用沙箱环境、避免复杂表达式、验证输入等提高安全性。更推荐使用 `ast.literal_eval()`、自定义解析器或 JSON 解析等替代方案,以确保代码安全性和可靠性。
34 2
|
10天前
|
存储 人工智能 Python
[oeasy]python061_如何接收输入_input函数_字符串_str_容器_ 输入输出
本文介绍了Python中如何使用`input()`函数接收用户输入。`input()`函数可以从标准输入流获取字符串,并将其赋值给变量。通过键盘输入的值可以实时赋予变量,实现动态输入。为了更好地理解其用法,文中通过实例演示了如何接收用户输入并存储在变量中,还介绍了`input()`函数的参数`prompt`,用于提供输入提示信息。最后总结了`input()`函数的核心功能及其应用场景。更多内容可参考蓝桥、GitHub和Gitee上的相关教程。
10 0
|
1月前
|
Python
Python中的函数是**一种命名的代码块,用于执行特定任务或计算
Python中的函数是**一种命名的代码块,用于执行特定任务或计算
60 18
|
1月前
|
数据可视化 DataX Python
Seaborn 教程-绘图函数
Seaborn 教程-绘图函数
77 8
|
1月前
|
Python
Python中的函数
Python中的函数
54 8
|
4天前
|
人工智能 机器人 API
AppFlow:无代码部署Dify作为钉钉智能机器人
本文介绍如何通过计算巢AppFlow完成Dify的无代码部署,并将其配置到钉钉中作为智能机器人使用。首先,在钉钉开放平台创建应用,获取Client ID和Client Secret。接着,创建消息卡片模板并授予应用发送权限。然后,使用AppFlow模板创建连接流,配置Dify鉴权凭证及钉钉连接凭证,完成连接流的发布。最后,在钉钉应用中配置机器人,发布应用版本,实现与Dify应用的对话功能。
AppFlow:无代码部署Dify作为钉钉智能机器人
|
2月前
|
人工智能 自然语言处理 算法
具身智能高校实训解决方案 ----从AI大模型+机器人到通用具身智能
在具身智能的发展历程中,AI 大模型的出现成为了关键的推动力量。高校作为培养未来科技人才的摇篮,需要紧跟这一前沿趋势,开展具身智能实训课程。通过将 AI 大模型与具备 3D 视觉的机器人相结合,为学生搭建一个实践平台。
256 64
|
1月前
|
机器学习/深度学习 人工智能 算法
人工智能与机器人的结合:智能化世界的未来
人工智能与机器人的结合:智能化世界的未来
227 32

热门文章

最新文章