PyTorch 2.2 中文官方教程(七)(3)
在强化学习中,环境通常是我们指代模拟器或控制系统的方式。各种库提供了用于强化学习的模拟环境,包括 Gymnasium(之前是 OpenAI Gym)、DeepMind Control Suite 等。作为一个通用库,TorchRL 的目标是为大量 RL 模拟器提供可互换的接口,使您可以轻松地将一个环境与另一个环境进行交换。例如,可以用很少的字符创建一个包装的 gym 环境:
base_env = GymEnv("InvertedDoublePendulum-v4", device=device, frame_skip=frame_skip)
在这段代码中有几点需要注意:首先,我们通过调用 GymEnv
包装器创建了环境。如果传递了额外的关键字参数,它们将传递给 gym.make
方法,因此涵盖了最常见的环境构建命令。或者,也可以直接使用 gym.make(env_name, **kwargs)
创建 gym 环境,并将其包装在 GymWrapper 类中。
还有 device
参数:对于 gym,这只控制输入动作和观察状态将存储在的设备,但执行始终在 CPU 上进行。原因很简单,即 gym 不支持在设备上执行,除非另有说明。对于其他库,我们可以控制执行设备,并且尽可能保持存储和执行后端的一致性。
我们将向我们的环境附加一些转换,以准备数据供策略使用。在 Gym 中,通常通过包装器来实现这一点。TorchRL 采用了一种不同的方法,更类似于其他 pytorch 领域库,通过使用转换。要向环境添加转换,只需将其包装在一个 TransformedEnv
正如我们将在后面看到的,TorchRL 的许多类依赖于 TensorDict
进行通信。您可以将其视为带有一些额外张量功能的 python 字典。实际上,这意味着我们将要使用的许多模块需要告诉它们在它们将接收的 tensordict
)。通常情况下,如果省略了 out_keys
,则假定 in_keys
条目将被原地更新。对于我们的转换,我们感兴趣的唯一条目被称为 "observation"
env = TransformedEnv( base_env, Compose( # normalize observations ObservationNorm(in_keys=["observation"]), DoubleToFloat(in_keys=["observation"]), StepCounter(), ), )
env.transform[0].init_stats(num_iter=1000, reduce_dim=0, cat_dim=0)
print("normalization constant shape:", env.transform[0].loc.shape)
normalization constant shape: torch.Size([11])
一个环境不仅由其模拟器和转换定义,还由一系列描述其执行期间可以预期到的元数据定义。出于效率目的,当涉及环境规范时,TorchRL 是相当严格的,但您可以轻松检查您的环境规范是否合适。在我们的示例中,GymWrapper
print("observation_spec:", env.observation_spec) print("reward_spec:", env.reward_spec) print("input_spec:", env.input_spec) print("action_spec (as defined by input_spec):", env.action_spec)
observation_spec: CompositeSpec( observation: UnboundedContinuousTensorSpec( shape=torch.Size([11]), space=None, device=cuda:0, dtype=torch.float32, domain=continuous), step_count: BoundedTensorSpec( shape=torch.Size([1]), space=ContinuousBox( low=Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.int64, contiguous=True), high=Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.int64, contiguous=True)), device=cuda:0, dtype=torch.int64, domain=continuous), device=cuda:0, shape=torch.Size([])) reward_spec: UnboundedContinuousTensorSpec( shape=torch.Size([1]), space=ContinuousBox( low=Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.float32, contiguous=True), high=Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.float32, contiguous=True)), device=cuda:0, dtype=torch.float32, domain=continuous) input_spec: CompositeSpec( full_state_spec: CompositeSpec( step_count: BoundedTensorSpec( shape=torch.Size([1]), space=ContinuousBox( low=Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.int64, contiguous=True), high=Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.int64, contiguous=True)), device=cuda:0, dtype=torch.int64, domain=continuous), device=cuda:0, shape=torch.Size([])), full_action_spec: CompositeSpec( action: BoundedTensorSpec( shape=torch.Size([1]), space=ContinuousBox( low=Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.float32, contiguous=True), high=Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.float32, contiguous=True)), device=cuda:0, dtype=torch.float32, domain=continuous), device=cuda:0, shape=torch.Size([])), device=cuda:0, shape=torch.Size([])) action_spec (as defined by input_spec): BoundedTensorSpec( shape=torch.Size([1]), space=ContinuousBox( low=Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.float32, contiguous=True), high=Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.float32, contiguous=True)), device=cuda:0, dtype=torch.float32, domain=continuous)
check_env_specs succeeded!
为了好玩,让我们看看简单的随机执行是什么样子的。您可以调用 env.rollout(n_steps)并查看环境输入和输出的概况。动作将自动从动作规范域中绘制,因此您无需担心设计随机采样器。
通常,在每一步中,RL 环境接收一个动作作为输入,并输出一个观察、一个奖励和一个完成状态。观察可能是复合的,这意味着它可能由多个张量组成。这对于 TorchRL 来说不是问题,因为所有的观察集合都会自动打包在输出的TensorDict
rollout = env.rollout(3) print("rollout of three steps:", rollout) print("Shape of the rollout TensorDict:", rollout.batch_size)
rollout of three steps: TensorDict( fields={ action: Tensor(shape=torch.Size([3, 1]), device=cuda:0, dtype=torch.float32, is_shared=True), done: Tensor(shape=torch.Size([3, 1]), device=cuda:0, dtype=torch.bool, is_shared=True), next: TensorDict( fields={ done: Tensor(shape=torch.Size([3, 1]), device=cuda:0, dtype=torch.bool, is_shared=True), observation: Tensor(shape=torch.Size([3, 11]), device=cuda:0, dtype=torch.float32, is_shared=True), reward: Tensor(shape=torch.Size([3, 1]), device=cuda:0, dtype=torch.float32, is_shared=True), step_count: Tensor(shape=torch.Size([3, 1]), device=cuda:0, dtype=torch.int64, is_shared=True), terminated: Tensor(shape=torch.Size([3, 1]), device=cuda:0, dtype=torch.bool, is_shared=True), truncated: Tensor(shape=torch.Size([3, 1]), device=cuda:0, dtype=torch.bool, is_shared=True)}, batch_size=torch.Size([3]), device=cuda:0, is_shared=True), observation: Tensor(shape=torch.Size([3, 11]), device=cuda:0, dtype=torch.float32, is_shared=True), step_count: Tensor(shape=torch.Size([3, 1]), device=cuda:0, dtype=torch.int64, is_shared=True), terminated: Tensor(shape=torch.Size([3, 1]), device=cuda:0, dtype=torch.bool, is_shared=True), truncated: Tensor(shape=torch.Size([3, 1]), device=cuda:0, dtype=torch.bool, is_shared=True)}, batch_size=torch.Size([3]), device=cuda:0, is_shared=True) Shape of the rollout TensorDict: torch.Size([3])
条目指向当前步骤之后的数据。在大多数情况下,时间 t 的"next"
PPO 利用随机策略来处理探索。这意味着我们的神经网络将不得不输出一个分布的参数,而不是与采取的动作对应的单个值。
由于数据是连续的,我们使用 Tanh-Normal 分布来尊重动作空间的边界。TorchRL 提供了这样的分布,我们唯一需要关心的是构建一个神经网络,以输出策略所需的正确数量的参数(位置或均值,以及尺度):
- 定义一个神经网络
->2 * D_action
。 - 附加一个
来提取位置和尺度(例如,将输入分成两个相等的部分,并对尺度参数应用正变换)。 - 创建一个可以生成此分布并从中采样的概率
actor_net = nn.Sequential( nn.LazyLinear(num_cells, device=device), nn.Tanh(), nn.LazyLinear(num_cells, device=device), nn.Tanh(), nn.LazyLinear(num_cells, device=device), nn.Tanh(), nn.LazyLinear(2 * env.action_spec.shape[-1], device=device), NormalParamExtractor(), )
/opt/conda/envs/py_3.10/lib/python3.10/site-packages/torch/nn/modules/ UserWarning: Lazy modules are a new feature under heavy development so changes to the API or functionality can happen at any moment.
policy_module = TensorDictModule( actor_net, in_keys=["observation"], out_keys=["loc", "scale"] )
还接受Dict[str, str]
policy_module = ProbabilisticActor( module=policy_module, spec=env.action_spec, in_keys=["loc", "scale"], distribution_class=TanhNormal, distribution_kwargs={ "min":, "max":, }, return_log_prob=True, # we'll need the log-prob for the numerator of the importance weights )
价值网络是 PPO 算法的关键组件,尽管在推断时不会使用。这个模块将读取观察结果,并返回对接下来轨迹的折扣回报的估计。这使我们能够通过依赖在训练过程中动态学习的一些效用估计来分期学习。我们的价值网络与策略具有相同的结构,但为简单起见,我们为其分配了自己的一组参数。
value_net = nn.Sequential( nn.LazyLinear(num_cells, device=device), nn.Tanh(), nn.LazyLinear(num_cells, device=device), nn.Tanh(), nn.LazyLinear(num_cells, device=device), nn.Tanh(), nn.LazyLinear(1, device=device), ) value_module = ValueOperator( module=value_net, in_keys=["observation"], )
/opt/conda/envs/py_3.10/lib/python3.10/site-packages/torch/nn/modules/ UserWarning: Lazy modules are a new feature under heavy development so changes to the API or functionality can happen at any moment.
print("Running policy:", policy_module(env.reset())) print("Running value:", value_module(env.reset()))
Running policy: TensorDict( fields={ action: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.float32, is_shared=True), done: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.bool, is_shared=True), loc: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.float32, is_shared=True), observation: Tensor(shape=torch.Size([11]), device=cuda:0, dtype=torch.float32, is_shared=True), sample_log_prob: Tensor(shape=torch.Size([]), device=cuda:0, dtype=torch.float32, is_shared=True), scale: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.float32, is_shared=True), step_count: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.int64, is_shared=True), terminated: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.bool, is_shared=True), truncated: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.bool, is_shared=True)}, batch_size=torch.Size([]), device=cuda:0, is_shared=True) Running value: TensorDict( fields={ done: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.bool, is_shared=True), observation: Tensor(shape=torch.Size([11]), device=cuda:0, dtype=torch.float32, is_shared=True), state_value: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.float32, is_shared=True), step_count: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.int64, is_shared=True), terminated: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.bool, is_shared=True), truncated: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.bool, is_shared=True)}, batch_size=torch.Size([]), device=cuda:0, is_shared=True)
TorchRL 提供了一组DataCollector 类。简而言之,这些类执行三个操作:重置环境,根据最新观察计算动作,执行环境中的一步,并重复最后两个步骤,直到环境发出停止信号(或达到完成状态)。
collector = SyncDataCollector( env, policy_module, frames_per_batch=frames_per_batch, total_frames=total_frames, split_trajs=False, device=device, )
重放缓冲区是离策略 RL 算法的常见构建组件。在策略上下文中,每次收集一批数据时都会重新填充重放缓冲区,并且其数据将被重复消耗一定数量的时期。
TorchRL 的重放缓冲区使用一个通用容器ReplayBuffer
,它以缓冲区的组件作为参数:存储、写入器、采样器和可能的一些转换。只有存储(指示重放缓冲区容量)是强制性的。我们还指定了一个无重复的采样器,以避免在一个时期内多次采样相同的项目。对于 PPO 来说,使用重放缓冲区并不是强制性的,我们可以简单地从收集的批次中采样子批次,但使用这些类使我们能够以可重复的方式构建内部训练循环。
replay_buffer = ReplayBuffer( storage=LazyTensorStorage(frames_per_batch), sampler=SamplerWithoutReplacement(), )
可以直接从 TorchRL 中导入 PPO 损失以方便使用ClipPPOLoss
类。这是利用 PPO 的最简单方法:它隐藏了 PPO 的数学运算和相关控制流程。
PPO 需要计算一些“优势估计”。简而言之,优势是反映在处理偏差/方差折衷时对回报值的期望的值。要计算优势,只需(1)构建优势模块,该模块利用我们的值运算符,并且(2)在每个时期之前将每个数据批次通过它传递。GAE 模块将使用新的"advantage"
。 "value_target"
advantage_module = GAE( gamma=gamma, lmbda=lmbda, value_network=value_module, average_gae=True ) loss_module = ClipPPOLoss( actor=policy_module, critic=value_module, advantage_key="advantage", clip_epsilon=clip_epsilon, entropy_bonus=bool(entropy_eps), entropy_coef=entropy_eps, # these keys match by default but we set this for completeness value_target_key=advantage_module.value_target_key, critic_coef=1.0, gamma=0.99, loss_critic_type="smooth_l1", ) optim = torch.optim.Adam(loss_module.parameters(), lr) scheduler = torch.optim.lr_scheduler.CosineAnnealingLR( optim, total_frames // frames_per_batch, 0.0 )
- 收集数据
- 计算优势
- 循环遍历收集的数据以计算损失值
- 反向传播
- 优化
- 重复
- 重复
- 重复
logs = defaultdict(list) pbar = tqdm(total=total_frames * frame_skip) eval_str = "" # We iterate over the collector until it reaches the total number of frames it was # designed to collect: for i, tensordict_data in enumerate(collector): # we now have a batch of data to work with. Let's learn something from it. for _ in range(num_epochs): # We'll need an "advantage" signal to make PPO work. # We re-compute it at each epoch as its value depends on the value # network which is updated in the inner loop. advantage_module(tensordict_data) data_view = tensordict_data.reshape(-1) replay_buffer.extend(data_view.cpu()) for _ in range(frames_per_batch // sub_batch_size): subdata = replay_buffer.sample(sub_batch_size) loss_vals = loss_module( loss_value = ( loss_vals["loss_objective"] + loss_vals["loss_critic"] + loss_vals["loss_entropy"] ) # Optimization: backward, grad clipping and optimization step loss_value.backward() # this is not strictly mandatory but it's good practice to keep # your gradient norm bounded torch.nn.utils.clip_grad_norm_(loss_module.parameters(), max_grad_norm) optim.step() optim.zero_grad() logs["reward"].append(tensordict_data["next", "reward"].mean().item()) pbar.update(tensordict_data.numel() * frame_skip) cum_reward_str = ( f"average reward={logs['reward'][-1]: 4.4f} (init={logs['reward'][0]: 4.4f})" ) logs["step_count"].append(tensordict_data["step_count"].max().item()) stepcount_str = f"step count (max): {logs['step_count'][-1]}" logs["lr"].append(optim.param_groups[0]["lr"]) lr_str = f"lr policy: {logs['lr'][-1]: 4.4f}" if i % 10 == 0: # We evaluate the policy once every 10 batches of data. # Evaluation is rather simple: execute the policy without exploration # (take the expected value of the action distribution) for a given # number of steps (1000, which is our ``env`` horizon). # The ``rollout`` method of the ``env`` can take a policy as argument: # it will then execute this policy at each step. with set_exploration_mode("mean"), torch.no_grad(): # execute a rollout with the trained policy eval_rollout = env.rollout(1000, policy_module) logs["eval reward"].append(eval_rollout["next", "reward"].mean().item()) logs["eval reward (sum)"].append( eval_rollout["next", "reward"].sum().item() ) logs["eval step_count"].append(eval_rollout["step_count"].max().item()) eval_str = ( f"eval cumulative reward: {logs['eval reward (sum)'][-1]: 4.4f} " f"(init: {logs['eval reward (sum)'][0]: 4.4f}), " f"eval step-count: {logs['eval step_count'][-1]}" ) del eval_rollout pbar.set_description(", ".join([eval_str, cum_reward_str, stepcount_str, lr_str])) # We're also using a learning rate scheduler. Like the gradient clipping, # this is a nice-to-have but nothing necessary for PPO to work. scheduler.step()
在达到 100 万步限制之前,算法应该已经达到了 1000 步的最大步数,这是轨迹被截断之前的最大步数。
plt.figure(figsize=(10, 10)) plt.subplot(2, 2, 1) plt.plot(logs["reward"]) plt.title("training rewards (average)") plt.subplot(2, 2, 2) plt.plot(logs["step_count"]) plt.title("Max step count (training)") plt.subplot(2, 2, 3) plt.plot(logs["eval reward (sum)"]) plt.title("Return (test)") plt.subplot(2, 2, 4) plt.plot(logs["eval step_count"]) plt.title("Max step count (test)")
- 如何使用
创建和自定义环境; - 如何编写模型和损失函数;
- 如何设置典型的训练循环。
- 从效率的角度来看,我们可以并行运行多个模拟以加快数据收集速度。查看
以获取更多信息。 - 从记录的角度来看,可以在请求渲染后向环境添加
