Python 与 TensorFlow2 生成式 AI(五)(2)https://developer.aliyun.com/article/1512058
在 PyBullet Gym 上运行 GAIL
在本章的代码示例中,我们将训练一个虚拟代理在一个模拟环境中导航 - 在许多 RL 论文中,这个环境是使用 Mujoco 框架 (www.mujoco.org/
) 进行模拟的。Mujoco 代表 Multi joint dynamics with contacts - 这是一个物理“引擎”,可以让您创建一个人工代理(如摆锤或双足人形),其中“奖励”可能是通过模拟环境移动的能力。
尽管它是用于开发强化学习基准的流行框架,例如由研究小组 OpenAI 使用(请参阅github.com/openai/baselines
以了解其中一些实现),但它也是闭源的,并且需要许可证才能使用。对于我们的实验,我们将使用 PyBullet Gymperium (github.com/benelot/pybullet-gym
),这是一个用于模拟 Mujoco 环境中的代理的物理模拟器和导入代理的替代品。
图 12.5: 来自 Mujoco 模拟环境的示例(http://www.mujoco.org/)
要安装 Pybullet-Gym,您需要先安装 OpenAI Gym,使用以下命令:
git clone https://github.com/openai/gym.git cd gym pip install -e .
然后安装 Pybullet:
git clone https://github.com/benelot/pybullet-gym.git cd pybullet-gym pip install -e .
为了展示这个模拟环境是如何工作的,让我们创建一个“hopper”,这是您可以使用库实例化的许多虚拟代理之一:
import gym import pybulletgym env = gym.make('HopperMuJoCoEnv-v0') env.render("human") env.reset()
如果命令执行正确,我们将看到以下输出,即给出行走者当前观察(11 维向量)的数组。
图 12.6:行走者的观察向量
对 render("human")
的调用将创建一个窗口,显示“蹦床”,这是一个简单的单脚人物,在模拟的 3D 环境中移动(图 12.7):
图 12.7:PyGym 蹦床
我们可以运行几次原始的蹦床迭代,以了解其外观。在这个模拟中,我们最多进行 1,000 步,并使用弹出窗口进行可视化:
env.reset() for t in range(1000): action = env.action_space.sample() _, _, done, _ = env.step(action) env.render("human") if done: break
我们首先使用 reset()
清除环境。然后,在最多 1,000 个时间步长内,我们会对动作空间进行采样(例如,表示行走图形在虚拟环境中移动的 x、y 和 z 坐标)。然后,我们使用该动作获取更新的奖励和观察,并渲染结果,直到移动完成。
这个演示来自完全未经训练的蹦床。对于我们的 GAIL 实现,我们将需要一个已成功训练行走的蹦床,作为算法的“专家”轨迹样本。为此,我们将从 OpenAI 网站下载一组蹦床数据,网址为:
drive.google.com/drive/folders/1h3H4AY_ZBx08hz-Ct0Nxxus-V1melu1U
这些包含一组 NumPy 文件,例如deterministic.trpo.Hopper.0.00.npz
,其中包含使用前面我们讨论过的 GAIL 算法中的 TRPO 算法训练的增强学习代理的数据样本。
如果我们加载这些数据,我们还可以使用 Pybullet 模拟器进行可视化,但这次我们将看到专家的步骤,而不是随机基线代理:
import numpy as np mujoco_hopper_np = np.load('deterministic.trpo.Hopper.0.00.npz') for i_episode in range(20): observation = env.reset() episode = np.random.choice(mujoco_hopper_np['acs'].shape[0]) for t in range(1000): env.render("human") action = mujoco_hopper_np = \ np.load('deterministic.trpo.Hopper.0.00.npz')['acs'][episode][t] observation, reward, done, info = env.step(action) if done: print("Episode finished after {} timesteps".format(t+1)) break env.close()
此代码加载了预先训练的蹦床,初始化了虚拟环境,并在其中最多进行 1,000 步,在这些步骤中,动作(蹦床的下一个移动)是使用蹦床的训练策略函数确定的,根据该动作更新环境状态(蹦床的位置)。注意,这里的策略函数是确定性的,导致在任何给定时间 t 的任何给定动作都会产生相同的结果。您现在可以看到蹦床迈出了许多步:
图 12.8:由 TRPO 训练的专家策略的蹦床移动
让我们更仔细地看一下我们加载的这个 NumPy 对象中有哪些数据。您会注意到,格式 .npz
是一组压缩文件的 gzip 存档。我们可以通过使用对象 mujoco_hopper_np
的 files 参数来查看这些存档的名称:
print(mujoco_hopper_np.files)
得到:
['ep_rets', 'obs', 'rews', 'acs']
观察是 11 维对象,您可以通过查看 obs
的维度来验证:
print(mujoco_hopper_np['obs'].shape)
该数组有 1,500 个示例,每个 1,000 个时间步长,每个时间步长有 11 个表示不同物理量(铰链位置、扭矩等)的维度。蹦跶者任务的目标是尽可能快地向前移动,所以当代理学会向前移动时,奖励函数会更高。如果我们检查acs
数据,我们会发现它有三个维度,对应于三维空间中的点。这是一个连续的动作空间,不同于我们之前讨论的离散示例。
print(mujoco_hopper_np ['acs'].shape)
ep_rets
对应于在时间 t 执行动作的未来预测奖励,而奖励rews
是奖励函数的输出。
代理:演员-评论家网络
要创建我们的 GAIL 实现,首先我们需要指定一个代理。(29)这是演员-评论家架构,由两个网络组成:一个学习观察结果的“值”(评论家),另一个(演员)基于观察结果进行动作采样。这些网络可以是独立的,也可以共享参数;对于我们的实验,我们让它们共享隐藏层和输入层,但有单独的输出层(图 12.9)。
图 12.9:演员-评论家架构
注意,本章 GAIL 实现的代码基于github.com/fangihsiao/GAIL-Tensorflow/blob/master/tf_gail.ipynb
。
下面我们定义ActorCritic
类:
import tensorflow_probability as tfp import tensorflow as tf tfd = tfp.distributions class ActorCritic(tf.keras.Model): def __init__(self, name='actor_critic', dim_actions=3, num_layers=2, input_shape=(11), num_units=100, **kwargs): super().__init__(name=name, **kwargs) self._num_layers = num_layers self._num_units = num_units self._dim_actions = dim_actions self._layers = list() for n, l in enumerate(range(self._num_layers)): self._layers.append(tf.keras.layers.Dense( self._num_units, activation=tf.nn.relu)) if n == 0: self._layers[-1].build(input_shape) else: self._layers[-1].build((num_units)) self._layers.append(tf.keras.layers.BatchNormalization()) self._value_output = tf.keras.layers.Dense(1,activation=None) self._value_output.build((num_units)) self._action_output = tf.keras.layers.Dense( self._dim_actions, activation=tf.nn.tanh) self._action_output.build((num_units)) self._action_dist_std = tf.Variable([1., 1, 1], trainable=False) self._action_dist = None def get_params(self): weights = [] for layer in self.layers: weights += layer.trainable_weights return weights+\ self._action_output.trainable_weights + \ self._value_output.trainable_weights + \ [self._action_dist_std] def call(self, inputs): x = self._layers0 for layer in self._layers[1:self._num_layers]: x = layer(x) return self._value_output(x) def log_prob(self, x): return self._action_dist.log_prob(x) def sample(self, inputs, output='action'): x = self._layers0 for layer in self._layers[1:self._num_layers]: x = layer(x) self._action_dist = tfd.Normal(self._action_output(x), [1,1,1]) if output == 'action': return self._action_dist.sample() elif output == 'entropy': return tf.reduce_mean(self._action_dist.entropy()) else: raise ValueError("unknown sample type: {}".format(output))
该类初始化一个接受输入状态和动作对并生成两个输出的网络-一个生成新动作(代表虚拟空间中蹦跶者的下一步移动的 3D 坐标)-演员-另一个生成值(表示蹦跶者在虚拟空间中移动的成功程度)-评论家。值输出是一个单一标量,随着蹦跶者运动质量的提高而增加,而动作是一个 3 单元向量,表示在 3D 空间中移动的每个坐标的均值和标准偏差。
因为我们的网络有多个输出,所以在设置输入层和初始化它们时需要小心。请注意,我们明确调用了两个输出层的build
,而不是让它们在前向传递中自动实例化,因为这将导致模型编译错误。我们还实例化了一个变量_action_dist_std
,包含动作维度的标准偏差,我们将在模型中用它来采样新坐标。我们还包括了BatchNormalization
层,以防止我们网络中的梯度爆炸或消失。(29)
我们还需要能够返回模型中可训练的参数,以便进行梯度计算,使用Actor-Critic
网络的get_params
方法:
def get_params(self): weights = [] for layer in self.layers: weights += layer.trainable_weights return weights+\ self._action_output.trainable_weights + \ self._value_output.trainable_weights + \ [self._action_dist_std]
在我们的前向传递中,我们计算了评论家的输出:
def call(self, inputs): x = self._layers0 for layer in self._layers[1:self._num_layers]: x = layer(x) return self._value_output(x)
为了从 Actor 中抽样新的动作(3D 移动),我们使用参数 'action'
运行样本函数 - 如果我们提供 'entropy'
替代,则返回动作分布的熵:
def sample(self, inputs, output='action'): x = self._layers0 for layer in self._layers[1:self._num_layers]: x = layer(x) self._action_dist = tfd.Normal(self._action_output(x), [1,1,1]) if output == 'action': return self._action_dist.sample() elif output == 'entropy': return tf.reduce_mean(self._action_dist.entropy()) else: raise ValueError("unknown sample type: {}".format(output))
最后,我们需要能够返回 PPO 网络中动作分布的对数概率(用于我们的损失函数),如下所述:
def log_prob(self, x): return self._action_dist.log_prob(x)
我们的 IRL 代理 - 我们将使用 Proximal Policy Optimization (PPO) 策略更新,这是 2017 年发表的 TRPO 的改进²⁹ - 利用这个 Actor-Critic 网络作为“策略”函数。
class PPO(tf.keras.Model): def __init__(self, name='ppo', dim_actions=3, num_layers=2, num_units=100, eps=0.2, v_coeff=0.5, ent_coeff=0.01, lr=3e-2, **kwargs): super().__init__(name=name, *kwargs) self._dim_actions = dim_actions self._num_layers = num_layers self._num_units = num_units self._eps = eps self._v_coeff = v_coeff self._ent_coeff = ent_coeff self._policy = ActorCritic(num_layers=self._num_layers, num_units=self._num_units, dim_actions=self._dim_actions) self._new_policy = ActorCritic(num_layers=self._num_layers, num_units=self._num_units, dim_actions=self._dim_actions) self._policy.compile(run_eagerly=True) self._new_policy.compile(run_eagerly=True) self._optimizer = tf.keras.optimizers.Adam(lr)
这个类初始化一个神经网络(_policy
)并为该网络的更新提供了一个占位符(_new_policy
),以便在算法的每一步中,我们都可以参照其相对于上一策略的改进来更新新策略。
在 train_policy
循环内的损失函数使用梯度下降算法进行优化,其中梯度的大小被限制在一个固定范围内(“剪辑”),以便大梯度不会导致损失函数(和权重)在训练轮次之间发生 drastical 改变:
def loss(self, actions, observations, advantages, returns): ratio = tf.exp(self._new_policy.log_prob(actions) - self._policy.log_prob(actions)) surr = ratio * advantages actor_loss = tf.reduce_mean( tf.minimum(surr, tf.clip_by_value(ratio, 1 - self._eps, 1 + self._eps) * advantages)) critic_loss = tf.reduce_mean(tf.square(returns - self._new_policy.call(observations))) return -1*actor_loss - self._ent_coeff * \ tf.reduce_mean(self._new_policy.sample(observations, 'entropy'))\ + self._v_coeff * critic_loss
在这个损失函数中,我们首先取旧策略(Actor-Critic 网络的当前参数)与潜在更新(新策略)之间的比率 - 两者的对数概率差的指数给出了一个比率(这是观察到的动作在每个网络的动作分布下的概率)。如果新的提议网络是一个改进(其参数更好地适应了观察到的动作序列),比率大于 1。否则,建议的质量保持不变(比率为 1)或比当前的 Actor-Critic 参数更差(比率小于 1)。
我们将这个比率乘以“优势”,优势是回报(我们之前描述的 Q 函数)与 Actor-Critic 现有状态的当前值之间的差异。在这个 GAIL 实现中,我们通过广义优势估计²⁹来计算优势,它使用 Q 函数的指数平滑估计,其中 gamma
(系数)和 tau
(指数)控制了未来奖励估计在未来的衰减程度相对于没有未来信息(tau
= 0)或未来数据与现在数据相比的重要性没有衰减(tau
= 1):
def compute_gae(next_value, rewards, masks, values, gamma=0.99, tau=0.95): values = values + [next_value] gae = 0 returns = [] for step in reversed(range(len(rewards))): delta = rewards[step] + gamma * values[step + 1] * \ masks[step] - values[step] gae = delta + gamma * tau * masks[step] * gae returns.insert(0, gae + values[step]) return returns
上面的损失函数然后使用优势乘以surr
("替代"项),并计算两个值——第一个是演员损失,它将由优势给出的损失项约束在给定范围内,由clip_by_value
函数表示。这可以防止新旧策略的概率比率的极端值(远小于 1 或大于 1)使损失函数不稳定。除此之外,我们还加上评论者的损失,评论者值与我们上面计算的优势函数之间的平方差。对演员和评论者函数以及行动概率分布的熵(它是否为位置子集分配高值,从而包含关于潜在行动分布的“信息”)进行加权求和,得到损失函数的总体策略质量作为目标。
请注意actor_loss
乘以负 1(因为它是旧策略概率与新策略概率之间的比率——因此如果它改善了,它大于 1,但损失函数应该最小化,因此一个更大的负值)。同样,熵项如果更大,它含有更多信息,我们也要取它的负值,因为我们要最小化损失函数。评论者的损失越接近 0,它就变得更好,所以我们保留这个项为正。
要使用这个损失函数,我们定义了一个名为train_policy
的自定义训练函数:
def train_policy(self, actions, observations, advantages, returns): params = self._new_policy.get_params() with tf.GradientTape(watch_accessed_variables=False) as g: g.watch(params) def loss(actions, observations, advantages, returns): ... cost = loss(actions, observations, advantages, returns) grads = g.gradient(cost, params) grads = [grad if grad is not None else tf.zeros_like(var) for var, grad in zip(params, grads)] self._optimizer.apply_gradients(zip(grads, params), experimental_aggregate_gradients=False)
我们使用get_params()
函数提取 PPO 策略网络的可训练参数,使用GradientTape
对它们进行“监视”,并使用上面的损失函数计算损失。此外,由于演员-评论者有两个输出(动作和值),只有一个输出(值)受到奖励更新的影响,我们可能会有不存在的梯度,所以我们用 0 替换任何空的梯度。
在我们上面描述的 GAIL 内循环的每一步中,我们还需要能够用深拷贝(创建一个具有相同值的新变量,而不是指向原始变量的指针)来替换旧策略为新策略:
def update_policy(self): self._policy = copy.deepcopy(self._new_policy)
最后,我们可以使用对演员-评论者策略网络的调用来获得价值(奖励)估计和采样新的行动:
def get_action(self, x): return self._new_policy.sample(x, output='action') def get_value(self, x): return self._new_policy.call(x)
鉴别器
通过我们的 PPO 算法,我们有了需要教会表现得像一个专家的“代理人”。我们可以从我们以“生成器”形式下载的 TRPO 训练示例的弹跳者中抽样。
现在我们只需要一个鉴别器网络,它旨在区分专家行为和我们正在训练的代理者:
class Discriminator(tf.keras.Model): def __init__(self, name='discriminator', dim_actions=3, num_layers=2, num_units=100, lr=3e-2, **kwargs): super().__init__(name=name, **kwargs) self._dim_actions = dim_actions self._num_layers = num_layers self._num_units = num_units self._layers = list() for l in range(self._num_layers): self._layers.append(tf.keras.layers.Dense( self._num_units, activation=tf.nn.relu)) self._layers.append(tf.keras.layers.BatchNormalization()) self._layers.append(tf.keras.layers.Dense(1,activation=None)) self._optimizer = tf.keras.optimizers.Adam(lr) return self._new_policy.call(x)
就像演员-评论者一样,这是一个 3 层神经网络,层与层之间应用了BatchNormalization
。它的单个输出指示输入的质量(就像演员-评论者的价值函数),当网络更像“专家”时,输出应该更低。请注意,为了使“奖励”输出与演员-评论者值输出的符号匹配,我们反转了鉴别器的符号,因为它对专家观察预测得更接近 0:
def get_reward(self, x): return -1 *tf.squeeze(tf.math.log(tf.sigmoid(self.call(x))))
此转换应用于我们之前为 Actor-Critic 网络看到的相同 call
函数。
def call(self, x): for layer in self._layers: x = layer(x) return x
训练和结果
训练网络时,我们应用一个损失函数,试图将专家(观察,动作)对分类为 0,并将代理(观察,动作)对分类为 1。当代理学会生成高质量的(观察,动作)对,类似于专家时,鉴别器将越来越难区分来自代理和专家的样本,并且也会将代理样本标记为 0:
def loss(self, x): expert_out, policy_out = tf.sigmoid(tf.split(self.call(x), num_or_size_splits=2, axis=0)) return (tf.nn.sigmoid_cross_entropy_with_logits(tf.ones_like(policy_out), policy_out) + tf.nn.sigmoid_cross_entropy_with_logits(tf.zeros_like(expert_out), expert_out))
与之前一样,我们通过 get_params()
提取网络的参数:
def get_params(self): weights = [] for layer in self.layers: weights += layer.trainable_weights return weights
然后我们使用 train_discriminator
将我们的损失函数应用于这些参数:
def train_discriminator(self, x): params = self.get_params() with tf.GradientTape(watch_accessed_variables=False) as g: g.watch(params) cost = self.loss(x) grads = g.gradient(cost, params) grads = [grad if grad is not None else tf.zeros_like(var) for var, grad in zip(params, grads)] self._optimizer.apply_gradients(zip(grads, params), experimental_aggregate_gradients=False)
最后,我们需要一个更新函数用于我们的 PPO 小批量步骤,在 GAIL 算法的每个内循环中从代理中随机采样观察:
def ppo_iteration(mini_batch_size, observations, actions, returns, advantages): batch_size = observations.shape[0] for _ in range(batch_size // mini_batch_size): rand_ids = np.random.randint(0, batch_size, mini_batch_size) yield (observations[rand_ids, :], actions[rand_ids, :], returns[rand_ids, :], advantages[rand_ids, :])
我们还希望能够绘制我们训练代理的进展,为此,我们将使用模型从环境中取样,并绘制平均奖励和鉴别器的性能(代理和专家的鉴别器奖励匹配程度):
from IPython.display import clear_output import matplotlib.pyplot as plt def plot(frame_idx, rewards, policy_ob_ac_rew, expert_ob_ac_rew): clear_output(True) plt.figure(figsize=(20,5)) plt.subplot(131) plt.title('frame %s. reward: %s' % (frame_idx, rewards[-1])) plt.ylabel('Agent Reward') plt.xlabel('Step in Training') plt.plot(rewards) plt.subplot(132) plt.title('frame %s.' % (frame_idx)) plt.plot(policy_ob_ac_rew) plt.plot(expert_ob_ac_rew) plt.legend(['Agent','Expert']) plt.xlabel('Steps in Test Simulation') plt.ylabel('Discriminator Reward') plt.show()
此函数生成两个图表,显示在图 12.10中。左侧是代理的一组测试观察的奖励,随着代理在移动跳跃器方面变得更好,奖励应该增加。右侧绘制了每个 n 步样本代理和专家跳跃器运动的鉴别器如何区分两者的情况(橙色和蓝色线是否重叠,这是最优的,或者彼此之间是否很远,这种情况下 GAIL 算法尚未收敛):
图 12.10:一系列测试观察的代理奖励(左),鉴别器奖励(右)。图由 plot() 生成。
测试样本是使用 test_env
函数生成的,该函数类似于我们上面看到的 Pybullet 仿真 - 它使用当前代理并计算当前策略下仿真的 n 步,返回平均奖励:
def test_env(model, vis=False): ob = env.reset() ob = tf.reshape(tf.convert_to_tensor(ob), (1,11)) done = False total_reward = 0 while not done: if vis: env.render() ac = model.get_action(ob)[0] ac = tf.reshape(tf.convert_to_tensor(ac), (3, 1)) next_ob, reward, done, _ = env.step(ac) ob = next_ob ob = tf.reshape(tf.convert_to_tensor(ob), (1,11)) total_reward += reward return total_reward
对于我们的主函数,我们将设置最大仿真步数和算法的超参数,包括 ADAM 优化器的学习率 (lr
),每个网络隐藏层中的单位数,每个步数在代理中运行每个跳跃仿真的步数 (num_steps
),在每个代理更新中选择的样本数量(小批量大小),在每个内部循环更新代理时运行的步数 (ppo_epochs
),算法中的总最大步数 (max_frames
),以及一个容器来保存我们上面展示如何绘制的外样本奖励估计 (test_rewards
):
ppo_hidden_size = 32 discriminator_hidden_size = 32 lr = 3e-4 num_steps = 1000 mini_batch_size = 50 ppo_epochs = 5 max_frames = 100000000 frame_idx = 0 test_rewards = []
首先,我们初始化鉴别器和 PPO 网络,设置内部循环计数器以进行代理更新 (i_update
),并设置 Pybullet 环境:
ob = env.reset() ppo = PPO(lr=lr, num_units=ppo_hidden_size) discriminator = Discriminator(lr=lr, num_units=discriminator_hidden_size) i_update = 0
在每一步,我们将使用当前策略计算一定数量的时间步,并创建这些观察、动作和奖励的列表。在固定的间隔内,我们将使用我们上面描述的函数绘制结果:
while frame_idx < max_frames: i_update += 1 values = [] obs = [] acs = [] rewards = [] masks = [] entropy = 0 for _ in range(num_steps): ob = tf.reshape(tf.convert_to_tensor(ob), (1,11)) ac = ppo.get_action(ob) ac = tf.reshape(tf.convert_to_tensor(ac), (3, 1)) next_ob, _, done, _ = env.step(ac) reward = discriminator.get_reward(np.concatenate([ob, tf.transpose(ac)], axis=1)) value = ppo.get_value(ob) values.append(value) rewards.append(reward) masks.append((1-done)) obs.append(ob) acs.append(np.transpose(ac)) ob = next_ob frame_idx += 1 if frame_idx % 1000 == 0 and i_update > 1: test_reward = np.mean([test_env(ppo) for _ in range(10)]) test_rewards.append(test_reward) plot(frame_idx, test_rewards, discriminator.get_reward(policy_ob_ac), discriminator.get_reward(expert_ob_ac)) next_ob = tf.reshape(tf.convert_to_tensor(next_ob), (1,11)) next_value = ppo.get_value(next_ob) returns = compute_gae(next_value, rewards, masks, values) returns = np.concatenate(returns) values = np.concatenate(values) obs = np.concatenate(obs) acs = np.concatenate(acs) advantages = returns - values