为集成LLM到测试平台提供更便捷的方式:为讯飞的LLM星火创建接入LangChain类(全部源代码)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
大数据开发治理平台 DataWorks,不限时长
简介: 为集成LLM到测试平台提供更便捷的方式:为讯飞的LLM星火创建接入LangChain类(全部源代码)

为集成LLM到测试平台提供更便捷的方式:为讯飞的LLM星火创建接入LangChain类(全部源代码)
LangChain是一个强大的框架,旨在帮助开发人员使用语言模型构建端到端的应用程序。它提供了一套工具、组件和接口,可简化创建由大型语言模型 (LLM) 和聊天模型提供支持的应用程序的过程。LangChain 可以轻松管理与语言模型的交互,将多个组件链接在一起,并集成额外的资源,例如 API 和数据库。

讯飞已经发布了v3的模型,提供一定份额的免费API的tokens,对于尝试学习以及一些简单尝试非常友好,最近学习LangChain,发现没有讯飞星火的集成,因此自己动手写了一个。

官方的访问API

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File    :   SparkApi.py
@Time    :   2023/10/18 11:01:37
@Author  :   CrissChan 
@Version :   1.0
@Site    :   https://blog.csdn.net/crisschan
@Desc    :   
'''


import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time

import websocket  # 使用websocket_client
answer = ""

class Ws_Param(object):
    # 初始化
    def __init__(self, APPID, APIKey, APISecret, Spark_url):
        self.APPID = APPID
        self.APIKey = APIKey
        self.APISecret = APISecret
        self.host = urlparse(Spark_url).netloc
        self.path = urlparse(Spark_url).path
        self.Spark_url = Spark_url

    # 生成url
    def create_url(self):
        # 生成RFC1123格式的时间戳
        now = datetime.now()
        date = format_date_time(mktime(now.timetuple()))

        # 拼接字符串
        signature_origin = "host: " + self.host + "\n"
        signature_origin += "date: " + date + "\n"
        signature_origin += "GET " + self.path + " HTTP/1.1"

        # 进行hmac-sha256进行加密
        signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()

        signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')

        authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'

        authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')

        # 将请求的鉴权参数组合为字典
        v = {
            "authorization": authorization,
            "date": date,
            "host": self.host
        }
        # 拼接鉴权参数,生成url
        url = self.Spark_url + '?' + urlencode(v)
        # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
        return url


# 收到websocket错误的处理
def on_error(ws, error):
    print("### error:", error)


# 收到websocket关闭的处理
def on_close(ws,one,two):
    print(" ")


# 收到websocket连接建立的处理
def on_open(ws):
    thread.start_new_thread(run, (ws,))


def run(ws, *args):
    data = json.dumps(gen_params(appid=ws.appid, domain= ws.domain,question=ws.question))
    ws.send(data)


# 收到websocket消息的处理
def on_message(ws, message):
    # print(message)
    data = json.loads(message)
    # print(data)
    code = data['header']['code']
    if code != 0:
        print(f'请求错误: {code}, {data}')
        ws.close()
    else:
        choices = data["payload"]["choices"]
        status = choices["status"]
        content = choices["text"][0]["content"]
        print(content,end ="")
        # 尝试找到可以转成python对象的list结构
        # print(content)
        global answer
        answer += content

        #print(1)
        if status == 2:
            ws.close()


def gen_params(appid, domain,question):
    """
    通过appid和用户的提问来生成请参数
    """
    data = {
        "header": {
            "app_id": appid,
            "uid": "1234"
        },
        "parameter": {
            "chat": {
                "domain": domain,
                "random_threshold": 0.5,
                "temperature": 0.5,
                "max_tokens": 2048,
                "auditing": "default"
            }
        },
        "payload": {
            "message": {
                "text": question
            }
        }
    }
    return data


def main(appid, api_key, api_secret, Spark_url,domain, question):
    # print("星火:")
    wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
    # websocket.enableTrace(False)
    wsUrl = wsParam.create_url()
    ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
    ws.appid = appid
    ws.question = question
    ws.domain = domain
    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})

SparkAPI和LangChain中间的一个调用层

下面代码是一个SparkAPI和LangChain的调用层,方便更好封装Spark的参数。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File    :   spark_middlerware.py
@Time    :   2023/11/01 18:44:52
@Author  :   CrissChan 
@Version :   1.0
@Site    :   https://blog.csdn.net/crisschan
@Desc    :   链接sparkapi的中间件,中间控制版本,token上限等
'''



import SparkApi
import os
from dotenv import load_dotenv, find_dotenv

#以下密钥信息从控制台获取
class SparkMiddleware(object):

    _=load_dotenv(find_dotenv())
    appid = os.getenv("SPARK_APP_ID")
    api_secret=os.getenv("SPARK_APP_SECRET")
    api_key=os.getenv("SPARK_APP_KEY")


    #用于配置大模型版本,默认“general/generalv2”
    # domain = "general"   # v1.5版本
    # domain = "generalv2"    # v2.0版本
    #domain = "generalv3"    # v3.0版本
    #云端环境的服务地址
    # Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat"  # v1.5环境的地址
    # Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat"  # v2.0环境的地址ws(s)://spark-api.xf-yun.com/v2.1/chat
    # Spark_url = "ws://spark-api.xf-yun.com/v3.1/chat"  # v3.0环境的地址ws(s)://spark-api.xf-yun.com/v3.1/chat
    # 定义了sparkdomain和url的dict,这样在输入的时候就可以自动匹配对应的版本地址
    domain_url = {"general":"ws://spark-api.xf-yun.com/v1.1/chat",
                  "generalv2":"ws://spark-api.xf-yun.com/v2.1/chat",
                  "generalv3":"ws://spark-api.xf-yun.com/v3.1/chat",
                  }

    text =[]

    '''
    @des     :spark middlerware的构造函数,创建一个和封装sparkapi调用的参数的中间层   
    @params  : 
              domain 代表需要调取spark的版本其中有三种值可选 "general"是v1.5版本,"generalv2"表示v2.0版本, "generalv3"表示v3.0版本,当前讯飞的星火就有三个版本
              role 代表角色,星火的有两个角色“user”表示是用户的问题,“assistant”表示AI的回复
    @return  :None

    '''
    def __init__(self,domain,role,content) -> None:
        self.text.clear
        self.__getText(role,content)
        SparkApi.main(self.appid,self.api_key,self.api_secret,self.domain_url[domain],domain,self.text)

        pass
    '''
    @des  :拼装成访问参数中的text需要的格式   
    @params  : role 代表角色,星火的有两个角色user表示是用户的问题,assistant表示AI的回复
               content是用户输入的问题
    @return  :None

    '''

    def __getText(self,role,content) -> None:

        jsoncon = {}
        jsoncon["role"] = role
        jsoncon["content"] = content
        self.text.append(jsoncon)
        # return self.text
        self.__checklen()


    '''
    @des  :获取这次传递给llm的prompt的长度

    @params  :None

    @return  :None

    '''
    def __getlength(self)-> None:
        length = 0
        for content in self.text:
            temp = content["content"]
            leng = len(temp)
            length += leng
        return length
    '''
    @des  :参数长度检查,如果全部的prompt的长度超过了8000,那么就删除这次拼装好的prompt

    @params  :None

    @return  :None

    '''


    def __checklen(self)-> None:
        while (self.__getlength() > 8000):
            del self.text[0]
        # return self.text
    '''
    @des  :获取LLM的反馈

    @params  :None

    @return  :string

    '''

    def response(self)-> str:
        return SparkApi.answer

SparkLLM继承LangChain的LLM

SparkLLM,继承了LangChain的LLM,参考了LangChain官方的CustomerLLM的写法。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File    :   iflytek.py
@Time    :   2023/10/27 17:28:58
@Author  :   CrissChan 
@Version :   1.0
@Site    :   https://blog.csdn.net/crisschan
@Desc    :   通过Langchain的customerLLM的方式,把讯飞的spark介入Langchain,按照Langchain的https://python.langchain.com/docs/modules/model_io/models/llms/custom_llm进行改写
'''
import logging
from typing import Any, List, Optional

from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.llms.base import LLM

from spark_middlerware import SparkMiddleware
class SparkLLM(LLM):
    #domain 代表需要调取spark的版本其中有三种值可选 "general"是v1.5版本,"generalv2"表示v2.0版本, "generalv3"表示v3.0版本,当前讯飞的星火就有三个版本
    domain :str
    @property
    def _llm_type(self) -> str:
        return "Spark"
    # @property
    # def _identifying_params(self) -> Mapping[str, Any]:
    #     """Get the identifying parameters."""
    #     _model_kwargs = self.model_kwargs or {}
    #     return {
    #         **{"endpoint_url": self.endpoint_url},
    #         **{"model_kwargs": _model_kwargs},
    #     }


    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        if stop is not None:
            raise ValueError("stop kwargs are not permitted.")
        # return prompt[: self.n]
        smw = SparkMiddleware(domain=self.domain,role='user',content=prompt)
        try:
            logging.debug("spark response :"+smw.response())
            return smw.response()
        except Exception as e:
            logging.debug(f"spark middlerware error :{e}")
            return "error"

总结

如上源代码拿来即可以用,其中需要Python的_变量以及将appid、api_secret、apikey存在项目根目录的env文件中就可以了。
如果对于
变量不熟悉可以学习:https://blog.csdn.net/crisschan/article/details/133277855?spm=1001.2014.3001.5501

PS

下载地址:https://download.csdn.net/download/chenlei_525/88496832

相关实践学习
使用CLup和iSCSI共享盘快速体验PolarDB for PostgtreSQL
在Clup云管控平台中快速体验创建与管理在iSCSI共享盘上的PolarDB for PostgtreSQL。
AnalyticDB PostgreSQL 企业智能数据中台:一站式管理数据服务资产
企业在数据仓库之上可构建丰富的数据服务用以支持数据应用及业务场景;ADB PG推出全新企业智能数据平台,用以帮助用户一站式的管理企业数据服务资产,包括创建, 管理,探索, 监控等; 助力企业在现有平台之上快速构建起数据服务资产体系
目录
相关文章
|
2月前
|
JSON API 数据格式
LangChain Agent:赋予 LLM 行动力的神秘力量
LangChain Agent 是什么?有什么用?基本原理是什么?那么多 Agent 类型在实际开发中又该如何选择?
173 8
LangChain Agent:赋予 LLM 行动力的神秘力量
|
2月前
|
小程序 调度 数据库
jeecg-boot集成xxl-job调度平台,每秒/每分钟/手动都能执行成功,但是设置固定时间不触发?
jeecg-boot集成xxl-job调度平台,每秒/每分钟/手动都能执行成功,但是设置固定时间不触发?
44 0
|
4月前
|
测试技术 Python
cypress 和allure 集成生成测试报告
cypress 和allure 集成生成测试报告
cypress 和allure 集成生成测试报告
|
4天前
|
jenkins 测试技术 持续交付
深入探索软件测试中的持续集成与自动化测试实践
【4月更文挑战第27天】 在当今软件开发的快速迭代过程中,持续集成(CI)和自动化测试已成为确保代码质量和加快交付速度的关键因素。本文将探讨如何通过实施持续集成流程,并结合自动化测试策略来优化软件测试工作。我们将分析持续集成的原理、自动化测试的最佳实践以及如何将这些方法应用于实际项目中,旨在为读者提供一套完整的解决方案,以提高软件项目的效率和质量。
11 3
|
5天前
|
测试技术 Python
python集成测试执行测试
【4月更文挑战第20天】
12 6
|
4月前
|
数据采集 DataWorks 数据管理
DataWorks不是Excel,它是一个数据集成和数据管理平台
DataWorks不是Excel,它是一个数据集成和数据管理平台
138 2
|
13天前
|
监控 测试技术 数据安全/隐私保护
如何将代理IP集成到自动化测试框架中?
如何将代理IP集成到自动化测试框架中?
|
15天前
|
人工智能
MIT等首次深度研究集成LLM预测能力:可媲美人类群体准确率
【4月更文挑战第16天】研究人员集成12个大型语言模型(LLM)组成“硅基群体”,在预测比赛中与925名人类预测者对比。研究发现,LLM群体的预测准确性与人类群体无显著差异,且通过集成可抵消个体模型的偏差,提高预测准确。GPT-4和Claude 2等模型结合人类预测后,准确度提升17%至28%。然而,个别LLM预测精度不一,模型选择和校准度是提升预测性能的关键,同时LLM在时间跨度和现实场景适应性方面仍有挑战。
22 6
MIT等首次深度研究集成LLM预测能力:可媲美人类群体准确率
|
17天前
|
分布式计算 Hadoop 测试技术
Hadoop【基础知识 05】【HDFS的JavaAPI】(集成及测试)
【4月更文挑战第5天】Hadoop【基础知识 05】【HDFS的JavaAPI】(集成及测试)
41 8
|
2月前
|
jenkins Java 持续交付
Docker搭建持续集成平台Jenkins最简教程
Jenkins 是一个广泛使用的开源持续集成工具,它能够自动化构建、测试和部署软件项目。在本文中,我们将使用 Docker 搭建一个基于 Jenkins 的持续集成平台。
123 2