深度强化学习中Double DQN算法(Q-Learning+CNN)的讲解及在Asterix游戏上的实战(超详细 附源码)

简介: 深度强化学习中Double DQN算法(Q-Learning+CNN)的讲解及在Asterix游戏上的实战(超详细 附源码)

需要源码和环境搭建请点赞关注收藏后评论区留下QQ~~~

一、核心思想

针对DQN中出现的高估问题,有人提出深度双Q网络算法(DDQN),该算法是将强化学习中的双Q学习应用于DQN中。在强化学习中,双Q学习的提出能在一定程度上缓解Q学习带来的过高估计问题。

DDQN的主要思想是在目标值计算时将动作的选择和评估分离,在更新过程中,利用两个网络来学习两组权重,分别是预测网络的权重W和目标网络的权重W',在DQN中,动作选择和评估都是通过目标网络来实现的,而在DDQN中,计算目标Q值时,采取目标网络获取最优动作,再通过预测网络估计该最优动作的目标Q值,这样就可以将最优动作选额和动作值函数估计分离,采用不用的样本保证独立性

二、允许结果与分析

本节实验在Asterix游戏上,通过控制参数变量对DQN,DDQN算法进行性能对比从而验证了在一定程度上,DDQN算法可以缓解DQN算法的高估问题,DDQN需要两个不同的参数网络,每1000步后预测预测网络的参数同步更新给目标网络,实验设有最大能容纳1000000记录的缓冲池,每个Atari游戏,DDQN算法训练1000000时间步

实战结果如下图所示,图中的DDQN算法最后收敛回报明显大于DQN,并且在实验过程中,可以发现DQN算法容易陷入局部的情况,其问题主要在于Q-Learning中的最大化操作,Agent在选择动作时每次都取最大Q值得动作,对于真实的策略来说,在给定的状态下并不是每次都选择Q值最大的动作,因为一般真实的策略都是随机性策略,所以在这里目标值直接选择动作最大的Q值往往会导致目标值高于真实值

为了解决值函数高估计的问题,DDQN算法将动作的选择和动作的评估分别用不同的值函数来实现,结果表明DDQN能够估计出更准确的Q值,在一些Atari2600游戏中可获得更稳定有效的策略

三、代码

部分源码如下

import gym, random, pickle, os.path, math, glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import torch.autograd as autograd
import pdb
from atari_wrappers import make_atari, wrap_deepmind,LazyFrames
    def __init__(self, in_channels=4, num_actions=5):
       nnels: number of channel of input.
                i.e The number of most recent frames stacked together as describe in the paper
            num_actions: number of action-value to output, one-to-one correspondence to action in game.
        """
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.fc4 = nn.Linear(7 * 7 * 64, 512)
        self.fc5 = nn.Linear(512, num_actions)
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.fc4(x.view(x.size(0), -1)))
        return self.fc5(x)
class Memory_Buffer(object):
    def __init__(self, memory_size=1000):
        self.buffer = []
        self.memory_size = memory_size
        self.next_idx = 0
    def push(self, state, action, reward, next_state, done):
        data = (state, action, reward, next_state, done)
        if len(self.buffer) <= self.memory_size: # buffer not full
            self.buffer.append(data)
        else: # buffer is full
            self.buffer[self.next_idx] = data
        self.next_idx = (self.next_idx + 1) % self.memory_size
    def sample(self, batch_size):
        states, actions, rewards, next_states, dones = [], [], [], [], []
        for i in range(batch_size):
            idx = random.randint(0, self.size() - 1)
            data = self.buffer[idx]
            state, action, reward, next_state, done= data
            states.append(state)
            actions.append(action)
            rewards.append(reward)
            next_states.append(next_state)
            dones.append(done)
        return np.concatenate(states), actions, rewards, np.concatenate(next_states), dones
    def size(self):
        return len(self.buffer)
class DDQNAgent:
    def __init__(self, in_channels = 1, action_space = [], USE_CUDA = False, memory_size = 10000, epsilon  = 1, lr = 1e-4):
        self.epsilon = epsilon
        self.action_space = action_space
        self.memory_buffer = Memory_Buffer(memory_size)
        self.DQN = DQN(in_channels = in_channels, num_actions = action_space.n)
        self.DQN_target = DQN(in_channels = in_channels, num_actions = action_space.n)
        self.DQN_target.load_state_dict(self.DQN.state_dict())
        self.USE_CUDA = USE_CUDA
        if USE_CUDA:
            self.DQN = self.DQN.to(device)
            self.DQN_target = self.DQN_target.to(device)
        self.optimizer = optim.RMSprop(self.DQN.parameters(),lr=lr, eps=0.001, alpha=0.95)
    def observe(self, lazyframe):
        # from Lazy frame to tensor
        state =  torch.from_numpy(lazyframe._force().transpose(2,0,1)[None]/255).float()
        if self.USE_CUDA:
            state = state.to(device)
        return state
    def value(self, state):
        q_values = self.DQN(state)
        return q_values
    def act(self, state, epsilon = None):
        """
        sample actions with epsilon-greedy policy
        recap: with p = epsilon pick random action, else pick action with highest Q(s,a)
        """
        if epsilon is None: epsilon = self.epsilon
        q_values = self.value(state).cpu().detach().numpy()
        if random.random()<epsilon:
            aciton = random.randrange(self.action_space.n)
        else:
            aciton = q_values.argmax(1)[0]
        return aciton
    def compute_td_loss(self, states, actions, rewards, next_states, is_done, gamma=0.99):
        """ Compute td loss using torch operations only. Use the formula above. """
        actions = torch.tensor(actions).long()    # shape: [batch_size]
        rewards = torch.tensor(rewards, dtype =torch.float)  # shape: [batch_size]
        is_done = torch.tensor(is_done, dtype = torch.uint8)  # shape: [batch_size]
        if self.USE_CUDA:
            actions = actions.to(device)
            rewards = rewards.to(device)
            is_done = is_done.to(device)
        # get q-values for all actions in current states
        predicted_qvalues = self.DQN(states)
        # select q-values for chosen actions
        predicted_qvalues_for_actions = predicted_qvalues[
          range(states.shape[0]), actions
        ]
        # compute q-values for all actions in next states
        ## Where DDQN is different from DQN
        predicted_next_qvalues_current = self.DQN(next_states)
        predicted_next_qvalues_target = self.DQN_target(next_states)
        # compute V*(next_states) using predicted next q-values
        next_state_values =  predicted_next_qvalues_target.gather(1, torch.max(predicted_next_qvalues_current, 1)[1].unsqueeze(1)).squeeze(1)
        # compute "target q-values" for loss - it's what's inside square parentheses in the above formula.
        target_qvalues_for_actions = rewards + gamma *next_state_values
        # at the last state we shall use simplified formula: Q(s,a) = r(s,a) since s' doesn't exist
        target_qvalues_for_actions = torch.where(
            is_done, rewards, target_qvalues_for_actions)
        # mean squared error loss to minimize
        #loss = torch.mean((predicted_qvalues_for_actions -
        #                   target_qvalues_for_actions.detach()) ** 2)
        loss = F.smooth_l1_loss(predicted_qvalues_for_actions, target_qvalues_for_actions.detach())
        return loss
    def sample_from_buffer(self, batch_size):
        states, actions, rewards, next_states, dones = [], [], [], [], []
        for i in range(batch_size):
            idx = random.randint(0, self.memory_buffer.size() - 1)
            data = self.memory_buffer.buffer[idx]
            frame, action, reward, next_frame, done= data
            states.append(self.observe(frame))
            actions.append(action)
            rewards.append(reward)
            next_states.append(self.observe(next_frame))
            dones.append(done)
        return torch.cat(states), actions, rewards, torch.cat(next_states), dones
    def learn_from_experience(self, batch_size):
        if self.memory_buffer.size() > batch_size:
            states, actions, rewards, next_states, dones = self.sample_from_buffer(batch_size)
            td_loss = self.compute_td_loss(states, actions, rewards, next_states, dones)
            self.optimizer.zero_grad()
            td_loss.backward()
            for param in self.DQN.parameters():
                param.grad.data.clamp_(-1, 1)
            self.optimizer.step()
            return(td_loss.item())
        else:
            return(0)
def moving_average(a, n=3) :
    ret = np.cumsum(a, dtype=float)
    ret[n:] = ret[n:] - ret[:-n]
    return ret[n - 1:] / n
def plot_training(frame_idx, rewards, losses):
    clear_output(True)
    plt.figure(figsize=(20,5))
    plt.subplot(131)
    plt.title('frame %s. reward: %s' % (frame_idx, np.mean(rewards[-100:])))
    plt.plot(moving_average(rewards,20))
    plt.subplot(132)
    plt.title('loss, average on 100 stpes')
    plt.plot(moving_average(losses, 100),linewidth=0.2)
    plt.show()
# if __name__ == '__main__':
# Training DQN in PongNoFrameskip-v4
env = make_atari('PongNoFrameskip-v4')
env = wrap_deepmind(env, scale = False, frame_stack=True)
gamma = 0.99
epsilon_max = 1
epsilon_min = 0.01
eps_decay = 30000
frames = 1000000
USE_CUDA = True
learning_rate = 2e-4
max_buff = 100000
update_tar_interval = 1000
batch_size = 32
print_interval = 1000
log_interval = 1000
learning_start = 10000
win_reward = 18     # Pong-v4
win_break = True
action_space = env.action_space
action_dim = env.action_space.n
state_dim = env.observation_space.shape[0]
state_channel = env.observation_space.shape[2]
agent = DDQNAgent(in_channels = state_channel, action_space= action_space, USE_CUDA = USE_CUDA, lr = learning_rate)
#frame = env.reset()
episode_reward = 0
all_rewards = []
losses = []
episode_num = 0
is_win = False
# tensorboard
summary_writer = SummaryWriter(log_dir = "DDQN", comment= "good_makeatari")
# e-greedy decay
epsilon_by_frame = lambda frame_idx: epsilon_min + (epsilon_max - epsilon_min) * math.exp(
            -1. * frame_idx / eps_decay)
# plt.plot([epsilon_by_frame(i) for i in range(10000)])
for i in range(frames):
    epsilon = epsilon_by_frame(i)
    #state_tensor = agent.observe(frame)
    #action = agent.act(state_tensor, epsilon)
    #next_frame, reward, done, _ = env.step(action)
    #episode_reward += reward
    #agent.memory_buffer.push(frame, action, reward, next_frame, done)
    #frame = next_frame
    loss = 0
    if agent.memory_buffer.size() >= learning_start:
        loss = agent.learn_from_experience(batch_size)
        losses.append(loss)
    if i % print_interval == 0:
        print("frames: %5d, reward: %5f, loss: %4f, epsilon: %5f, episode: %4d" % (i, np.mean(all_rewards[-10:]), loss, epsilon, episode_num))
        summary_writer.add_scalar("Temporal Difference Loss", loss, i)
        summary_writer.add_scalar("Mean Reward", np.mean(all_rewards[-10:]), i)
        summary_writer.add_scalar("Epsilon", epsilon, i)
    if iQN_dict.pth.tar")
plot_training(i, all_rewards, losses)

创作不易 觉得有帮助请点赞关注收藏~~~

相关文章
|
3月前
|
算法 数据可视化 测试技术
HNSW算法实战:用分层图索引替换k-NN暴力搜索
HNSW是一种高效向量检索算法,通过分层图结构实现近似最近邻的对数时间搜索,显著降低查询延迟。相比暴力搜索,它在保持高召回率的同时,将性能提升数十倍,广泛应用于大规模RAG系统。
354 10
HNSW算法实战:用分层图索引替换k-NN暴力搜索
|
3月前
|
机器学习/深度学习 缓存 算法
微店关键词搜索接口核心突破:动态权重算法与语义引擎的实战落地
本文详解微店搜索接口从基础匹配到智能推荐的技术进阶路径,涵盖动态权重、语义理解与行为闭环三大创新,助力商家提升搜索转化率、商品曝光与用户留存,实现技术驱动的业绩增长。
|
3月前
|
机器学习/深度学习 存储 算法
淘宝图片搜索接口开发实战:从 CNN 特征提取到商品匹配(附避坑手册 + 可复用代码)
本文详解淘宝图片搜索接口开发全流程,涵盖CNN特征提取、商品匹配、参数配置及400/429等高频报错解决方案,附合规避坑指南与可复用代码,助你高效实现图像搜商品功能。
|
4月前
|
机器学习/深度学习 资源调度 算法
遗传算法模型深度解析与实战应用
摘要 遗传算法(GA)作为一种受生物进化启发的优化算法,在复杂问题求解中展现出独特优势。本文系统介绍了GA的核心理论、实现细节和应用经验。算法通过模拟自然选择机制,利用选择、交叉、变异三大操作在解空间中进行全局搜索。与梯度下降等传统方法相比,GA不依赖目标函数的连续性或可微性,特别适合处理离散优化、多目标优化等复杂问题。文中详细阐述了染色体编码、适应度函数设计、遗传操作实现等关键技术,并提供了Python代码实现示例。实践表明,GA的成功应用关键在于平衡探索与开发,通过精心调参维持种群多样性同时确保收敛效率
|
3月前
|
存储 人工智能 算法
从零掌握贪心算法Java版:LeetCode 10题实战解析(上)
在算法世界里,有一种思想如同生活中的"见好就收"——每次做出当前看来最优的选择,寄希望于通过局部最优达成全局最优。这种思想就是贪心算法,它以其简洁高效的特点,成为解决最优问题的利器。今天我们就来系统学习贪心算法的核心思想,并通过10道LeetCode经典题目实战演练,带你掌握这种"步步为营"的解题思维。
|
4月前
|
机器学习/深度学习 人工智能 算法
当AI提示词遇见精密算法:TimeGuessr如何用数学魔法打造文化游戏新体验
TimeGuessr融合AI与历史文化,首创时间与空间双维度评分体系,结合分段惩罚、Haversine距离计算与加权算法,辅以连击、速度与完美奖励机制,实现公平且富挑战性的游戏体验。
|
4月前
|
机器学习/深度学习 边缘计算 人工智能
粒子群算法模型深度解析与实战应用
蒋星熠Jaxonic是一位深耕智能优化算法领域多年的技术探索者,专注于粒子群优化(PSO)算法的研究与应用。他深入剖析了PSO的数学模型、核心公式及实现方法,并通过大量实践验证了其在神经网络优化、工程设计等复杂问题上的卓越性能。本文全面展示了PSO的理论基础、改进策略与前沿发展方向,为读者提供了一份详尽的技术指南。
粒子群算法模型深度解析与实战应用
|
4月前
|
机器学习/深度学习 传感器 数据采集
基于贝叶斯优化CNN-LSTM混合神经网络预测(Matlab代码实现)
基于贝叶斯优化CNN-LSTM混合神经网络预测(Matlab代码实现)
810 0
|
4月前
|
机器学习/深度学习 传感器 数据采集
【故障识别】基于CNN-SVM卷积神经网络结合支持向量机的数据分类预测研究(Matlab代码实现)
【故障识别】基于CNN-SVM卷积神经网络结合支持向量机的数据分类预测研究(Matlab代码实现)
343 0

热门文章

最新文章