动态规划法在扫地机器人中的实战应用(基于动作值函数的策略迭代 python 附源码)

简介: 动态规划法在扫地机器人中的实战应用(基于动作值函数的策略迭代 python 附源码)

需要源码或觉得有帮助请点赞关注收藏后评论区留下QQ邮箱或者私信博主

与基于状态值函数的策略迭代不同,基于动作值函数的策略迭代是在当前策略下用另一个式子进行评估。

关于条件描述和环境搭建可以参考我这篇博客扫地机器人简介

算法步骤如下

下面通过基于动作值函数的策略迭代算法应用于确定环境的扫地机器人任务中,经过多轮迭代后,得到下图中动作值函数和策略迭代的更新过程

代码运行结果如下 经过五次迭代逐渐收敛

部分代码如下

# 代11-例4.7-基于动作值函数的确定环境扫地机器人任务策略迭代
import numpy as np
world_h =  5
world_w = 5
length = world_h * world_w
gamma = 0.8
state = [i for i in range(length)]  # 状态(编号)
action = ['n', 's', 'w', 'e']  # 动作名称
ds_action = {'n': -world_w, 'e': 1, 's': world_w, 'w': -1}
value = [0 for i in range(length)]  # 初始化状态值函数,均为0.  [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
policy = np.zeros([length, len(action)])
suqe=[20, 21, 22, 23, 24, 15, 16, 17, 18, 19, 10, 11, 12, 13, 14, 5, 6, 7, 8, 9, 0, 1, 2, 3,4]
# 定义奖励
def reward(s):
    if s == 20:  # 到充电站
        return 1
    elif s == 12:  # 到陷阱中
        return -10
    elif s == 9:  # 到垃圾处
        return 3
    else:
        return 0  # 其他
    # in表示0是[*,*,*]中的一个
def getAction(a):
    if a == 'n':
        return 0
    elif a == 'e':
        return 3
    elif a == 's':
        return 1
    elif a == 'w':
        return 2
# 在s状态下执行动作a,返回下一状态(编号)
def next_states(s, a):
    # 越过边界时pass
    if (s < world_w and a == 'n') \
            or (s % world_w == 0 and a == 'w') \
            or (s > length - world_w - 1 and a == 's') \
            or ((s + 1) % world_w == 0 and a == 'e'):  # (s % (world_w - 1) == 0 and a == 'e' and s != 0)
        next_state = s  # 表现为next_state不变
    else:
        next_state = s + ds_action[a]  # 进入下一个状态
    return next_state
# 在s状态下执行动作,返回所有可能的下一状态(编号)list
def getsuccessor(s):
    successor = []
    for a in action:  # 遍历四个动作
        if s == next_states(s, a):
            continue
        else:
            # print("状态s=%s,动作a=%s"%(s,a))
            next = next_states(s, a)  # 得到下一个状态(编号)
        successor.append(next)  # 以list保存当前状态s下执行四个动作的下一状态
    # print(len(successor))
    return successor
def initPolicy():
    for s in range(length):
        for a in action:
            if next_states(s, a) == s:
                continue
            newAction = getAction(a)
            policy[s][newAction] = 1 / len(getsuccessor(s))
    # print(policy)
def Caculate_Q(s, V, num, discount_factor=0.8):
    """
    Returns:
        A vector of length env.nA containing the expected value of each action.
    """
    Q = np.zeros((length, 4))
    Q[:][:] = -100
    for a in action:  # 遍历能走的动作
        # for prob, next_state, reward, done in env.P[s][a]:
        #     Q[s][a] += prob * (reward + discount_factor * V[next_state])  # 计算当前状态s下的动作值函数列表 [q1,q2,q3,q4]
        next_state = next_states(s, a)
        if next_state == s:  # 碰壁
            continue
        rewards = reward(next_state)
        numberA = getAction(a)
        if s == 12:
            Q[s][numberA] = rewards + discount_factor * V[s]
        else:
            Q[s][numberA] = rewards + discount_factor * V[next_state]
        action_name1 = ""
        if (numberA == 0):
            action_name1 = "UP"
        if (numberA == 1):
            action_name1 = "DOWN"
        if (numberA == 2):
            action_name1 = "LEFT"
        if (numberA == 3):
            action_name1 = "RIGHT"
        print("Q[%s][%s]  %f = %s + 0.8 * %.2f:" % (num, action_name1, Q[s][numberA], rewards, V[next_state]))
    return Q
def max_index(lst_int):#最大值索引函数
    index = []
    max_n = max(lst_int)
    for i in range(len(lst_int)):
        if lst_int[i] == max_n:
            index.append(i)
    return index  #返回一个列表
def list_not_equal(old,new):#判断list是否相等
    listequal=False
    for i in range(0,4):
        if old[i]!=new[i]:
            listequal=True
            break
    return listequal
def policy_eval(theta=0.0001):
    V = np.zeros(length)  # 初始化状态值函数列表
    iter = 0
    while True:
        k = -1
        delta = 0  # 定义最大差值,判断是否有进行更新
        for s in suqe:  # 遍历所有状态 [0~25]
            k += 1
            if s in [9, 20, 12]:  # 若当前状态为吸入状态,则直接pass不做操作
                continue
            v = 0  # 针对每个状态值函数进行计算
            # print("第%d 的状态" % (k), end="")
            for a in action:
                newAction = getAction(a)
                next_state = next_states(s, a)
                rewards = reward(next_state)
                if next_state == 12:
                    v += policy[s][newAction] * (rewards + gamma * V[s])
                else:
                    v += policy[s][newAction] * (rewards + gamma * V[next_state])
            delta = max(delta, np.abs(v - V[s]))  # 更新差值
            V[s] = v  # 存储(更新)每个状态下的状态值函数,即伪代码中的 v <- V(s)
        value = np.array(V).reshape(world_h, world_w)
        iter += 1
        if delta < theta:  # 策略评估的迭代次数不能太多,否则状态值函数的数值会越来越大(即使算法仍然在收敛)
            break
    return V  # 一轮迭代结束后,状态值函数暂时固定
def policy_improvement(V, policy):  # 策略改进
    """
    Returns:
        policy: the optimal policy, a matrix of shape [S, A] where each state s contains a valid probability distribution over actions.
        V: the value function for the optimal policy.
    """
    k = -1
    policy_stable = True  # Will be set to false if we make any changes to the policy
    for s in [20, 21, 22, 23, 24, 15, 16, 17, 18, 19, 10, 11, 12, 13, 14, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4]:
        if s in [9, 20, 12]:
            k += 1
            continue;
        k += 1
 [i]
    return V, policy_stable
def Policy_Iterration_byQ():
    initPolicy()
    k = 1
    while True:  # Evaluate the current policy
        print("迭代的次数",k)
        Q = policy_eval()
        print(Q)
        Q,policy_stable = policy_improvement(Q,policy)
        print("*" * 100)
        k+=1
        if policy_stable:  # # If the policy is stable we've found an optimal policy. Return it
            return policy, Q
Policy_Iterration_byQ()
相关文章
|
3月前
|
SQL 关系型数据库 数据库
Python SQLAlchemy模块:从入门到实战的数据库操作指南
免费提供Python+PyCharm编程环境,结合SQLAlchemy ORM框架详解数据库开发。涵盖连接配置、模型定义、CRUD操作、事务控制及Alembic迁移工具,以电商订单系统为例,深入讲解高并发场景下的性能优化与最佳实践,助你高效构建数据驱动应用。
473 7
|
3月前
|
数据采集 Web App开发 数据安全/隐私保护
实战:Python爬虫如何模拟登录与维持会话状态
实战:Python爬虫如何模拟登录与维持会话状态
|
3月前
|
传感器 运维 前端开发
Python离群值检测实战:使用distfit库实现基于分布拟合的异常检测
本文解析异常(anomaly)与新颖性(novelty)检测的本质差异,结合distfit库演示基于概率密度拟合的单变量无监督异常检测方法,涵盖全局、上下文与集体离群值识别,助力构建高可解释性模型。
385 10
Python离群值检测实战:使用distfit库实现基于分布拟合的异常检测
|
3月前
|
数据采集 监控 数据库
Python异步编程实战:爬虫案例
🌟 蒋星熠Jaxonic,代码为舟的星际旅人。从回调地狱到async/await协程天堂,亲历Python异步编程演进。分享高性能爬虫、数据库异步操作、限流监控等实战经验,助你驾驭并发,在二进制星河中谱写极客诗篇。
Python异步编程实战:爬虫案例
|
3月前
|
Cloud Native 算法 API
Python API接口实战指南:从入门到精通
🌟蒋星熠Jaxonic,技术宇宙的星际旅人。深耕API开发,以Python为舟,探索RESTful、GraphQL等接口奥秘。擅长requests、aiohttp实战,专注性能优化与架构设计,用代码连接万物,谱写极客诗篇。
Python API接口实战指南:从入门到精通
|
3月前
|
存储 分布式计算 测试技术
Python学习之旅:从基础到实战第三章
总体来说,第三章是Python学习路程中的一个重要里程碑,它不仅加深了对基础概念的理解,还引入了更多高级特性,为后续的深入学习和实际应用打下坚实的基础。通过这一章的学习,读者应该能够更好地理解Python编程的核心概念,并准备好应对更复杂的编程挑战。
150 12
|
3月前
|
存储 数据采集 监控
Python文件操作全攻略:从基础到高级实战
本文系统讲解Python文件操作核心技巧,涵盖基础读写、指针控制、异常处理及大文件分块处理等实战场景。结合日志分析、CSV清洗等案例,助你高效掌握文本与二进制文件处理,提升程序健壮性与开发效率。(238字)
407 1
|
3月前
|
存储 Java 调度
Python定时任务实战:APScheduler从入门到精通
APScheduler是Python强大的定时任务框架,通过触发器、执行器、任务存储和调度器四大组件,灵活实现各类周期性任务。支持内存、数据库、Redis等持久化存储,适用于Web集成、数据抓取、邮件发送等场景,解决传统sleep循环的诸多缺陷,助力构建稳定可靠的自动化系统。(238字)
688 1
|
3月前
|
Java 调度 数据库
Python threading模块:多线程编程的实战指南
本文深入讲解Python多线程编程,涵盖threading模块的核心用法:线程创建、生命周期、同步机制(锁、信号量、条件变量)、线程通信(队列)、守护线程与线程池应用。结合实战案例,如多线程下载器,帮助开发者提升程序并发性能,适用于I/O密集型任务处理。
378 0
|
3月前
|
机器学习/深度学习 监控 数据挖掘
Python 高效清理 Excel 空白行列:从原理到实战
本文介绍如何使用Python的openpyxl库自动清理Excel中的空白行列。通过代码实现高效识别并删除无数据的行与列,解决文件臃肿、读取错误等问题,提升数据处理效率与准确性,适用于各类批量Excel清理任务。
452 0

热门文章

最新文章

推荐镜像

更多