PokéLLMon 源码解析(五)(3)

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: PokéLLMon 源码解析(五)

PokéLLMon 源码解析(五)(2)https://developer.aliyun.com/article/1483727

.\PokeLLMon\poke_env\player\__init__.py

# 初始化 poke_env.player 模块
"""
# 导入并引入并发模块 POKE_LOOP
from poke_env.concurrency import POKE_LOOP
# 导入随机玩家、工具类
from poke_env.player import random_player, utils
# 导入基线玩家、简单启发式玩家
from poke_env.player.baselines import MaxBasePowerPlayer, SimpleHeuristicsPlayer
# 导入 GPT 玩家
from poke_env.player.gpt_player import LLMPlayer
# 导入 LLAMA 玩家
from poke_env.player.llama_player import LLAMAPlayer
# 导入战斗指令相关类
from poke_env.player.battle_order import (
    BattleOrder,
    DefaultBattleOrder,
    DoubleBattleOrder,
    ForfeitBattleOrder,
)
# 导入 OpenAI API 相关类
from poke_env.player.openai_api import ActType, ObsType, OpenAIGymEnv
# 导入玩家类
from poke_env.player.player import Player
# 导入随机玩家类
from poke_env.player.random_player import RandomPlayer
# 导入工具类中的函数
from poke_env.player.utils import (
    background_cross_evaluate,
    background_evaluate_player,
    cross_evaluate,
    evaluate_player,
)
# 导入 PS 客户端
from poke_env.ps_client import PSClient
# 导出的模块列表
__all__ = [
    "openai_api",
    "player",
    "random_player",
    "utils",
    "ActType",
    "ObsType",
    "ForfeitBattleOrder",
    "POKE_LOOP",
    "OpenAIGymEnv",
    "PSClient",
    "Player",
    "RandomPlayer",
    "cross_evaluate",
    "background_cross_evaluate",
    "background_evaluate_player",
    "evaluate_player",
    "BattleOrder",
    "DefaultBattleOrder",
    "DoubleBattleOrder",
    "MaxBasePowerPlayer",
    "SimpleHeuristicsPlayer",
]

.\PokeLLMon\poke_env\ps_client\account_configuration.py

# 该模块包含与玩家配置相关的对象
"""
# 导入必要的模块
from typing import Counter, NamedTuple, Optional
# 创建一个计数器对象,用于统计从玩家获取的配置信息
CONFIGURATION_FROM_PLAYER_COUNTER: Counter[str] = Counter()
# 定义一个命名元组对象,表示玩家配置。包含用户名和密码两个条目
class AccountConfiguration(NamedTuple):
    """Player configuration object. Represented with a tuple with two entries: username and
    password."""
    # 用户名
    username: str
    # 密码(可选)
    password: Optional[str]

.\PokeLLMon\poke_env\ps_client\ps_client.py

"""
这个模块定义了一个与 Showdown 服务器通信的基类。
"""
# 导入必要的库
import asyncio
import json
import logging
from asyncio import CancelledError, Event, Lock, create_task, sleep
from logging import Logger
from time import perf_counter
from typing import Any, List, Optional, Set
import requests
import websockets.client as ws
from websockets.exceptions import ConnectionClosedOK
from poke_env.concurrency import (
    POKE_LOOP,
    create_in_poke_loop,
    handle_threaded_coroutines,
)
from poke_env.exceptions import ShowdownException
from poke_env.ps_client.account_configuration import AccountConfiguration
from poke_env.ps_client.server_configuration import ServerConfiguration
# 定义 Pokemon Showdown 客户端类
class PSClient:
    """
    Pokemon Showdown 客户端。
    负责与 Showdown 服务器通信。还实现了一些用于基本任务的高级方法,如更改头像和低级消息处理。
    """
    def __init__(
        self,
        account_configuration: AccountConfiguration,
        *,
        avatar: Optional[int] = None,
        log_level: Optional[int] = None,
        server_configuration: ServerConfiguration,
        start_listening: bool = True,
        ping_interval: Optional[float] = 20.0,
        ping_timeout: Optional[float] = 20.0,
        """
        :param account_configuration: Account configuration.
        :type account_configuration: AccountConfiguration
        :param avatar: Player avatar id. Optional.
        :type avatar: int, optional
        :param log_level: The player's logger level.
        :type log_level: int. Defaults to logging's default level.
        :param server_configuration: Server configuration.
        :type server_configuration: ServerConfiguration
        :param start_listening: Whether to start listening to the server. Defaults to
            True.
        :type start_listening: bool
        :param ping_interval: How long between keepalive pings (Important for backend
            websockets). If None, disables keepalive entirely.
        :type ping_interval: float, optional
        :param ping_timeout: How long to wait for a timeout of a specific ping
            (important for backend websockets.
            Increase only if timeouts occur during runtime).
            If None pings will never time out.
        :type ping_timeout: float, optional
        """
        # 初始化活动任务集合
        self._active_tasks: Set[Any] = set()
        # 设置 ping_interval 和 ping_timeout
        self._ping_interval = ping_interval
        self._ping_timeout = ping_timeout
        # 设置服务器配置和账户配置
        self._server_configuration = server_configuration
        self._account_configuration = account_configuration
        # 设置玩家头像
        self._avatar = avatar
        # 创建登录事件和发送锁
        self._logged_in: Event = create_in_poke_loop(Event)
        self._sending_lock = create_in_poke_loop(Lock)
        # 初始化 websocket 和日志记录器
        self.websocket: ws.WebSocketClientProtocol
        self._logger: Logger = self._create_logger(log_level)
        # 如果需要开始监听服务器,则在 POKE_LOOP 线程安全地运行监听协程
        if start_listening:
            self._listening_coroutine = asyncio.run_coroutine_threadsafe(
                self.listen(), POKE_LOOP
            )
    # 异步方法,接受挑战,发送接受挑战的消息给指定用户名
    async def accept_challenge(self, username: str, packed_team: Optional[str]):
        # 断言当前用户已登录
        assert (
            self.logged_in.is_set()
        ), f"Expected player {self.username} to be logged in."
        # 设置队伍
        await self.set_team(packed_team)
        # 发送消息给指定用户名,接受挑战
        await self.send_message("/accept %s" % username)
    # 异步方法,发起挑战,发送挑战消息给指定用户名和格式
    async def challenge(self, username: str, format_: str, packed_team: Optional[str]):
        # 断言当前用户已登录
        assert (
            self.logged_in.is_set()
        ), f"Expected player {self.username} to be logged in."
        # 设置队伍
        await self.set_team(packed_team)
        # 发送挑战消息给指定用户名和格式
        await self.send_message(f"/challenge {username}, {format_}")
    # 创建日志记录器
    def _create_logger(self, log_level: Optional[int]) -> Logger:
        """Creates a logger for the client.
        Returns a Logger displaying asctime and the account's username before messages.
        :param log_level: The logger's level.
        :type log_level: int
        :return: The logger.
        :rtype: Logger
        """
        # 创建以用户名为名称的日志记录器
        logger = logging.getLogger(self.username)
        # 创建流处理器
        stream_handler = logging.StreamHandler()
        # 如果有指定日志级别,设置日志级别
        if log_level is not None:
            logger.setLevel(log_level)
        # 设置日志格式
        formatter = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )
        stream_handler.setFormatter(formatter)
        # 添加流处理器到日志记录器
        logger.addHandler(stream_handler)
        return logger
    # 异步方法,停止监听
    async def _stop_listening(self):
        # 关闭 WebSocket 连接
        await self.websocket.close()
    # 异步方法,更改玩家的头像
    async def change_avatar(self, avatar_id: Optional[int]):
        """Changes the player's avatar.
        :param avatar_id: The new avatar id. If None, nothing happens.
        :type avatar_id: int
        """
        # 等待用户登录
        await self.wait_for_login()
        # 如果有指定头像 id,发送更改头像的消息
        if avatar_id is not None:
            await self.send_message(f"/avatar {avatar_id}")
    # 异步方法,用于监听 showdown websocket 并分发消息以进行处理
    async def listen(self):
        # 记录日志,表示开始监听 showdown websocket
        self.logger.info("Starting listening to showdown websocket")
        try:
            # 使用 async with 连接到 websocket
            async with ws.connect(
                self.websocket_url,
                max_queue=None,
                ping_interval=self._ping_interval,
                ping_timeout=self._ping_timeout,
            ) as websocket:
                # 将 websocket 赋值给实例变量
                self.websocket = websocket
                # 遍历 websocket 接收到的消息
                async for message in websocket:
                    # 记录接收到的消息
                    self.logger.info("\033[92m\033[1m<<<\033[0m %s", message)
                    # 创建任务来处理接收到的消息
                    task = create_task(self._handle_message(str(message)))
                    # 将任务添加到活动任务集合中
                    self._active_tasks.add(task)
                    # 添加任务完成时的回调函数,从活动任务集合中移除任务
                    task.add_done_callback(self._active_tasks.discard)
        except ConnectionClosedOK:
            # 记录警告日志,表示 websocket 连接已关闭
            self.logger.warning(
                "Websocket connection with %s closed", self.websocket_url
            )
        except (CancelledError, RuntimeError) as e:
            # 记录严重错误日志,表示监听被中断
            self.logger.critical("Listen interrupted by %s", e)
        except Exception as e:
            # 记录异常日志
            self.logger.exception(e)
    # 异步方法,用于登录玩家,需要传入分割后的消息列表
    async def log_in(self, split_message: List[str]):
        """Log the player with specified username and password.
        Split message contains information sent by the server. This information is
        necessary to log in.
        :param split_message: Message received from the server that triggers logging in.
        :type split_message: List[str]
        """
        # 如果存在账户密码
        if self.account_configuration.password:
            # 发送登录请求,包括用户名、密码和服务器信息
            log_in_request = requests.post(
                self.server_configuration.authentication_url,
                data={
                    "act": "login",
                    "name": self.account_configuration.username,
                    "pass": self.account_configuration.password,
                    "challstr": split_message[2] + "%7C" + split_message[3],
                },
            )
            # 记录发送认证请求的信息
            self.logger.info("Sending authentication request")
            # 从返回的数据中获取认证信息
            assertion = json.loads(log_in_request.text[1:])["assertion"]
        else:
            # 如果不存在账户密码,则跳过认证请求
            self.logger.info("Bypassing authentication request")
            assertion = ""
        # 发送消息,包括用户名和认证信息
        await self.send_message(f"/trn {self.username},0,{assertion}")
        # 更改头像
        await self.change_avatar(self._avatar)
    # 异步方法,用于搜索排位赛游戏,需要传入比赛格式和打包的队伍信息
    async def search_ladder_game(self, format_: str, packed_team: Optional[str]):
        # 设置队伍信息
        await self.set_team(packed_team)
        # 发送搜索游戏消息,包括比赛格式
        await self.send_message(f"/search {format_}")
    # 异步方法,用于发送消息,可以指定房间和第二条消息
    async def send_message(
        self, message: str, room: str = "", message_2: Optional[str] = None
    ):
        """Sends a message to the specified room.
        `message_2` can be used to send a sequence of length 2.
        :param message: The message to send.
        :type message: str
        :param room: The room to which the message should be sent.
        :type room: str
        :param message_2: Second element of the sequence to be sent. Optional.
        :type message_2: str, optional
        """
        # 如果存在第二个消息,将消息和房间名以及第二个消息用竖线连接起来
        if message_2:
            to_send = "|".join([room, message, message_2])
        else:
            to_send = "|".join([room, message])
        # 发送消息
        await self.websocket.send(to_send)
    async def set_team(self, packed_team: Optional[str]):
        # 如果存在打包的团队信息,发送消息 "/utm {packed_team}"
        if packed_team:
            await self.send_message(f"/utm {packed_team}")
        else:
            # 否则发送消息 "/utm null"
            await self.send_message("/utm null")
    async def stop_listening(self):
        # 停止监听
        await handle_threaded_coroutines(self._stop_listening())
    async def wait_for_login(self, checking_interval: float = 0.001, wait_for: int = 5):
        start = perf_counter()
        # 在指定时间内等待登录
        while perf_counter() - start < wait_for:
            await sleep(checking_interval)
            if self.logged_in:
                return
        # 如果超时仍未登录,则抛出异常
        assert self.logged_in, f"Expected player {self.username} to be logged in."
    @property
    def account_configuration(self) -> AccountConfiguration:
        """The client's account configuration.
        :return: The client's account configuration.
        :rtype: AccountConfiguration
        """
        # 返回客户端的账户配置
        return self._account_configuration
    @property
    def logged_in(self) -> Event:
        """Event object associated with user login.
        :return: The logged-in event
        :rtype: Event
        """
        # 返回与用户登录相关的事件对象
        return self._logged_in
    @property
    def logger(self) -> Logger:
        """Logger associated with the player.
        :return: The logger.
        :rtype: Logger
        """
        # 返回与玩家相关的日志记录器
        return self._logger
    @property
    def server_configuration(self) -> ServerConfiguration:
        """获取客户端的服务器配置信息。
        :return: 客户端的服务器配置信息。
        :rtype: ServerConfiguration
        """
        return self._server_configuration
    @property
    def username(self) -> str:
        """玩家的用户名。
        :return: 玩家的用户名。
        :rtype: str
        """
        return self.account_configuration.username
    @property
    def websocket_url(self) -> str:
        """WebSocket 的 URL。
        它是从服务器 URL 派生而来。
        :return: WebSocket 的 URL。
        :rtype: str
        """
        return f"ws://{self.server_configuration.server_url}/showdown/websocket"

PokéLLMon 源码解析(五)(4)https://developer.aliyun.com/article/1483731

相关文章
|
1月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
69 2
|
14天前
|
PyTorch Shell API
Ascend Extension for PyTorch的源码解析
本文介绍了Ascend对PyTorch代码的适配过程,包括源码下载、编译步骤及常见问题,详细解析了torch-npu编译后的文件结构和三种实现昇腾NPU算子调用的方式:通过torch的register方式、定义算子方式和API重定向映射方式。这对于开发者理解和使用Ascend平台上的PyTorch具有重要指导意义。
|
18天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
47 12
|
1月前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
1月前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
48 3
|
2月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
61 5
|
2月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
144 5
|
2月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
76 0
|
2月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
58 0
|
2月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
65 0

推荐镜像

更多