蒙特卡洛法的简介以及实战应用(python实现 基于同策略首次访问蒙特卡洛算法 附源码)

简介: 蒙特卡洛法的简介以及实战应用(python实现 基于同策略首次访问蒙特卡洛算法 附源码)

需要源码或数据集请点赞关注收藏后评论区留言

一、蒙特卡洛法的基本概念

在实际问题中,通常不易获得完整的环境知识。蒙特卡洛法(MC)正是基于统计学的思想,通过大量采样获取数据来进行学习的方法称为经验方法。MC正式基于经验方法,在环境模型位置的情况下,采用时间步有限的,完整的情节根据经验进行学习。并通过平均采样回报来解决强化学习问题。

二、蒙特卡洛法(MC)的核心要素

1:经验:经验是从环境交互中获得的序列,它是情节的集合也就是样本集,经验既可以是真实经验也可以是模拟经验

2:情节:一段经验可以分为多个情节,每一个情节都是一个完整的序列,即一定有终止状态。

3:完整回报与目标值:因为只有到达终止状态才能计算回报,所以将情节的回报G称为完整回报,G也称为MC的目标值

三、蒙特卡洛预测

基于首次访问的MC预测算法步骤如下图

部分代码如下

from collections import defaultdict
from my_book_gridworld0406 import GridWorldEnv
import numpy as np
class Agent():
    def __init__(self):
        self.Q = {}  # {state:{action:q_value}} 初始化Q
    def create_epsilon_greedy_policy(self, nA):
        """
        Creates an epsilon-greedy policy based on Q values.
        基于Q值创建贪婪策略
alid_nA
            best_action = max(self.Q[state], key=self.Q[state].get)  # 获取最大value值对应的key,即得到最大动作值函数对应的动作
            A[best_action] += 1.0 - epsilon
            return A
        return policy_fn
    def mc_control_epsilon_greedy(self, env, gamma, max_episode_num):
        # flag = True
        returns_sum = defaultdict(float)
        returns_count = defaultdict(float)
        target_policy = self.create_epsilon_greedy_policy(env.action_space.n)
        num_episode = 0
        for state in range(env.observation_space.n):
            self.initValue(state, env.valid_actions_for_states, randomized=False)
        print("episode:{}".format(num_episode))
        print(epsilon_by_epsiode(num_episode))
        for s in print_states:
            if s in self.Q.keys():
                print("{}_Q:".format(s), end="")
                Q_s = []
                for a in self.Q[s].keys():
                    Q_s.append(round(self.Q[s][a], 3))
                print(Q_s)
                probs = target_policy(env.valid_actions_for_states, s, epsilon_by_epsiode(num_episode))
                action = np.random.choice(np.arange(len(probs)), p=probs)
                p = []
                for a in range(len(probs)):
                    p.append(round(probs[a], 3))
                print(p)
                print(action)
        while num_episode < max_episode_num:
            episode = []
            state = env.reset()
            while True:
                # env.render()
                probs = target_policy(env.valid_actions_for_states, state, epsilon_by_epsiode(num_episode))
                action = np.random.choice(np.arange(len(probs)), p=probs)
                next_state, reward, done, _ = env.step(action)
                episode.append((state, action, reward))
                if done:
                    break
                state = next_state
            num_episode += 1
            # Find all (state, action) pairs we've visited in this episode
            # We convert each state to a tuple so that we can use it as a dict key
            sa_in_episode = set([(x[0], x[1]) for x in episode])
            for state, action in sa_in_episode:
                sa_pair = (state, action)
                # Find the first occurance of the (state, action) pair in the episode
                first_occurence_idx = next(i for i, x in enumerate(episode)
                                           if x[0] == state and x[1] == action)
                # Sum up all rewards since the first occurance
                G = sum([x[2] * (gamma ** i) for i, x in enumerate(episode[first_occurence_idx:])])
                # Calculate average return for this state over all sampled episodes
                returns_sum[sa_pair] += G
                returns_count[sa_pair] += 1.0
                self.__setQValue(state, action, returns_sum[sa_pair] / returns_count[sa_pair])
            if num_episode in print_episodes:
                print("episode:{}".format(num_episode))
                print(epsilon_by_epsiode(num_episode))
                for s in print_states:
                    if s in self.Q.keys():
                        print("{}_Q:".format(s), end="")
                        Q_s = []
                        for a in self.Q[s].keys():
                            Q_s.append(round(self.Q[s][a], 3))
                        print(Q_s)
                        probs = target_policy(env.valid_actions_for_states, s, epsilon_by_epsiode(num_episode))
                        action = np.random.choice(np.arange(len(probs)), p=probs)
                        p = []
                        for a in range(len(probs)):
                            p.append(round(probs[a], 3))
                        print(p)
                        print(action)
        return self.Q
    # return a possible action list for a given state
    # def possibleActionsForstate(self, state):
    #  actions = []
    #  # add your code here
    #  return actions
    # if a state exists in Q dictionary
    def __isStateInQ(self, state):
        # 判断空值。有值则返回值,无值则返回None - None is not None = False
        return self.Q.get(state) is not None  # 因为是实例属性,所以要用self.进行引用
    def initValue(self, s, valid_actions_list, randomized=False):  # 初始化Q和E
        # Q[s]为空值时进入判断
        if not self.__isStateInQ(s):
            self.Q[s] = {}  # 初始化Q
            for a in valid_actions_list[s]:  # 遍历所有action_name
                self.Q[s][
                    a] = np.random().random() / 10 if randomized is True else 0.0  # 初始化Q(s,a);随机一个动作值函数。只有结束状态的Q(s,a) = 0
    """Q的获取与设置方法"""
    def __getQValue(self, s, a):  # ①
        return self.Q[s][a]  # argmax(q)
    def __setQValue(self, s, a, new_q):  # ②
        self.Q[s][a] = new_q
np.random.seed(1)
epsilon_start = 0.5
epsilon_final = 0
epsilon_episodes = 20000
epsilon_by_epsiode = lambda episode_idx: epsilon_start - (epsilon_start - epsilon_final) * min(episode_idx,
                                                                                               epsilon_episodes) / epsilon_episodes
agent = Agent()
env = GridWorldEnv()
print_states = [5, 10, 18, 20, 24]
print_episodes = [1, 7500, 12500, 19999, 20000]
Q = agent.mc_control_epsilon_greedy(env=env, gamma=0.8, max_episode_num=20000)

部分运行效果如下

 

相关文章
|
1天前
|
算法 Serverless 数据处理
从集思录可转债数据探秘:Python与C++实现的移动平均算法应用
本文探讨了如何利用移动平均算法分析集思录提供的可转债数据,帮助投资者把握价格趋势。通过Python和C++两种编程语言实现简单移动平均(SMA),展示了数据处理的具体方法。Python代码借助`pandas`库轻松计算5日SMA,而C++代码则通过高效的数据处理展示了SMA的计算过程。集思录平台提供了详尽且及时的可转债数据,助力投资者结合算法与社区讨论,做出更明智的投资决策。掌握这些工具和技术,有助于在复杂多变的金融市场中挖掘更多价值。
22 12
|
6天前
|
机器学习/深度学习 存储 算法
近端策略优化(PPO)算法的理论基础与PyTorch代码详解
近端策略优化(PPO)是深度强化学习中高效的策略优化方法,广泛应用于大语言模型的RLHF训练。PPO通过引入策略更新约束机制,平衡了更新幅度,提升了训练稳定性。其核心思想是在优势演员-评论家方法的基础上,采用裁剪和非裁剪项组成的替代目标函数,限制策略比率在[1-ϵ, 1+ϵ]区间内,防止过大的策略更新。本文详细探讨了PPO的基本原理、损失函数设计及PyTorch实现流程,提供了完整的代码示例。
114 10
近端策略优化(PPO)算法的理论基础与PyTorch代码详解
|
7天前
|
监控 算法 安全
内网桌面监控软件深度解析:基于 Python 实现的 K-Means 算法研究
内网桌面监控软件通过实时监测员工操作,保障企业信息安全并提升效率。本文深入探讨K-Means聚类算法在该软件中的应用,解析其原理与实现。K-Means通过迭代更新簇中心,将数据划分为K个簇类,适用于行为分析、异常检测、资源优化及安全威胁识别等场景。文中提供了Python代码示例,展示如何实现K-Means算法,并模拟内网监控数据进行聚类分析。
28 10
|
25天前
|
存储 算法 安全
控制局域网上网软件之 Python 字典树算法解析
控制局域网上网软件在现代网络管理中至关重要,用于控制设备的上网行为和访问权限。本文聚焦于字典树(Trie Tree)算法的应用,详细阐述其原理、优势及实现。通过字典树,软件能高效进行关键词匹配和过滤,提升系统性能。文中还提供了Python代码示例,展示了字典树在网址过滤和关键词屏蔽中的具体应用,为局域网的安全和管理提供有力支持。
50 17
|
27天前
|
算法 安全 Java
Java线程调度揭秘:从算法到策略,让你面试稳赢!
在社招面试中,关于线程调度和同步的相关问题常常让人感到棘手。今天,我们将深入解析Java中的线程调度算法、调度策略,探讨线程调度器、时间分片的工作原理,并带你了解常见的线程同步方法。让我们一起破解这些面试难题,提升你的Java并发编程技能!
68 16
|
1月前
|
负载均衡 算法
架构学习:7种负载均衡算法策略
四层负载均衡包括数据链路层、网络层和应用层负载均衡。数据链路层通过修改MAC地址转发帧;网络层通过改变IP地址实现数据包转发;应用层有多种策略,如轮循、权重轮循、随机、权重随机、一致性哈希、响应速度和最少连接数均衡,确保请求合理分配到服务器,提升性能与稳定性。
250 11
架构学习:7种负载均衡算法策略
|
1月前
|
机器学习/深度学习 人工智能 算法
基于Python深度学习的眼疾识别系统实现~人工智能+卷积网络算法
眼疾识别系统,本系统使用Python作为主要开发语言,基于TensorFlow搭建卷积神经网络算法,并收集了4种常见的眼疾图像数据集(白内障、糖尿病性视网膜病变、青光眼和正常眼睛) 再使用通过搭建的算法模型对数据集进行训练得到一个识别精度较高的模型,然后保存为为本地h5格式文件。最后使用Django框架搭建了一个Web网页平台可视化操作界面,实现用户上传一张眼疾图片识别其名称。
135 5
基于Python深度学习的眼疾识别系统实现~人工智能+卷积网络算法
|
1月前
|
机器学习/深度学习 人工智能 算法
机器学习算法的优化与改进:提升模型性能的策略与方法
机器学习算法的优化与改进:提升模型性能的策略与方法
276 13
机器学习算法的优化与改进:提升模型性能的策略与方法
|
2月前
|
算法 网络协议 Python
探秘Win11共享文件夹之Python网络通信算法实现
本文探讨了Win11共享文件夹背后的网络通信算法,重点介绍基于TCP的文件传输机制,并提供Python代码示例。Win11共享文件夹利用SMB协议实现局域网内的文件共享,通过TCP协议确保文件传输的完整性和可靠性。服务器端监听客户端连接请求,接收文件请求并分块发送文件内容;客户端则连接服务器、接收数据并保存为本地文件。文中通过Python代码详细展示了这一过程,帮助读者理解并优化文件共享系统。
|
2月前
|
存储 缓存 监控
局域网屏幕监控系统中的Python数据结构与算法实现
局域网屏幕监控系统用于实时捕获和监控局域网内多台设备的屏幕内容。本文介绍了一种基于Python双端队列(Deque)实现的滑动窗口数据缓存机制,以处理连续的屏幕帧数据流。通过固定长度的窗口,高效增删数据,确保低延迟显示和存储。该算法适用于数据压缩、异常检测等场景,保证系统在高负载下稳定运行。 本文转载自:https://www.vipshare.com
132 66

推荐镜像

更多