为集成LLM到测试平台提供更便捷的方式:为讯飞的LLM星火创建接入LangChain类(全部源代码)

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
实时计算 Flink 版,5000CU*H 3个月
简介: 为集成LLM到测试平台提供更便捷的方式:为讯飞的LLM星火创建接入LangChain类(全部源代码)

为集成LLM到测试平台提供更便捷的方式:为讯飞的LLM星火创建接入LangChain类(全部源代码)
LangChain是一个强大的框架,旨在帮助开发人员使用语言模型构建端到端的应用程序。它提供了一套工具、组件和接口,可简化创建由大型语言模型 (LLM) 和聊天模型提供支持的应用程序的过程。LangChain 可以轻松管理与语言模型的交互,将多个组件链接在一起,并集成额外的资源,例如 API 和数据库。

讯飞已经发布了v3的模型,提供一定份额的免费API的tokens,对于尝试学习以及一些简单尝试非常友好,最近学习LangChain,发现没有讯飞星火的集成,因此自己动手写了一个。

官方的访问API

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File    :   SparkApi.py
@Time    :   2023/10/18 11:01:37
@Author  :   CrissChan 
@Version :   1.0
@Site    :   https://blog.csdn.net/crisschan
@Desc    :   
'''


import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time

import websocket  # 使用websocket_client
answer = ""

class Ws_Param(object):
    # 初始化
    def __init__(self, APPID, APIKey, APISecret, Spark_url):
        self.APPID = APPID
        self.APIKey = APIKey
        self.APISecret = APISecret
        self.host = urlparse(Spark_url).netloc
        self.path = urlparse(Spark_url).path
        self.Spark_url = Spark_url

    # 生成url
    def create_url(self):
        # 生成RFC1123格式的时间戳
        now = datetime.now()
        date = format_date_time(mktime(now.timetuple()))

        # 拼接字符串
        signature_origin = "host: " + self.host + "\n"
        signature_origin += "date: " + date + "\n"
        signature_origin += "GET " + self.path + " HTTP/1.1"

        # 进行hmac-sha256进行加密
        signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()

        signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')

        authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'

        authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')

        # 将请求的鉴权参数组合为字典
        v = {
            "authorization": authorization,
            "date": date,
            "host": self.host
        }
        # 拼接鉴权参数,生成url
        url = self.Spark_url + '?' + urlencode(v)
        # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
        return url


# 收到websocket错误的处理
def on_error(ws, error):
    print("### error:", error)


# 收到websocket关闭的处理
def on_close(ws,one,two):
    print(" ")


# 收到websocket连接建立的处理
def on_open(ws):
    thread.start_new_thread(run, (ws,))


def run(ws, *args):
    data = json.dumps(gen_params(appid=ws.appid, domain= ws.domain,question=ws.question))
    ws.send(data)


# 收到websocket消息的处理
def on_message(ws, message):
    # print(message)
    data = json.loads(message)
    # print(data)
    code = data['header']['code']
    if code != 0:
        print(f'请求错误: {code}, {data}')
        ws.close()
    else:
        choices = data["payload"]["choices"]
        status = choices["status"]
        content = choices["text"][0]["content"]
        print(content,end ="")
        # 尝试找到可以转成python对象的list结构
        # print(content)
        global answer
        answer += content

        #print(1)
        if status == 2:
            ws.close()


def gen_params(appid, domain,question):
    """
    通过appid和用户的提问来生成请参数
    """
    data = {
        "header": {
            "app_id": appid,
            "uid": "1234"
        },
        "parameter": {
            "chat": {
                "domain": domain,
                "random_threshold": 0.5,
                "temperature": 0.5,
                "max_tokens": 2048,
                "auditing": "default"
            }
        },
        "payload": {
            "message": {
                "text": question
            }
        }
    }
    return data


def main(appid, api_key, api_secret, Spark_url,domain, question):
    # print("星火:")
    wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
    # websocket.enableTrace(False)
    wsUrl = wsParam.create_url()
    ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
    ws.appid = appid
    ws.question = question
    ws.domain = domain
    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})

SparkAPI和LangChain中间的一个调用层

下面代码是一个SparkAPI和LangChain的调用层,方便更好封装Spark的参数。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File    :   spark_middlerware.py
@Time    :   2023/11/01 18:44:52
@Author  :   CrissChan 
@Version :   1.0
@Site    :   https://blog.csdn.net/crisschan
@Desc    :   链接sparkapi的中间件,中间控制版本,token上限等
'''



import SparkApi
import os
from dotenv import load_dotenv, find_dotenv

#以下密钥信息从控制台获取
class SparkMiddleware(object):

    _=load_dotenv(find_dotenv())
    appid = os.getenv("SPARK_APP_ID")
    api_secret=os.getenv("SPARK_APP_SECRET")
    api_key=os.getenv("SPARK_APP_KEY")


    #用于配置大模型版本,默认“general/generalv2”
    # domain = "general"   # v1.5版本
    # domain = "generalv2"    # v2.0版本
    #domain = "generalv3"    # v3.0版本
    #云端环境的服务地址
    # Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat"  # v1.5环境的地址
    # Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat"  # v2.0环境的地址ws(s)://spark-api.xf-yun.com/v2.1/chat
    # Spark_url = "ws://spark-api.xf-yun.com/v3.1/chat"  # v3.0环境的地址ws(s)://spark-api.xf-yun.com/v3.1/chat
    # 定义了sparkdomain和url的dict,这样在输入的时候就可以自动匹配对应的版本地址
    domain_url = {"general":"ws://spark-api.xf-yun.com/v1.1/chat",
                  "generalv2":"ws://spark-api.xf-yun.com/v2.1/chat",
                  "generalv3":"ws://spark-api.xf-yun.com/v3.1/chat",
                  }

    text =[]

    '''
    @des     :spark middlerware的构造函数,创建一个和封装sparkapi调用的参数的中间层   
    @params  : 
              domain 代表需要调取spark的版本其中有三种值可选 "general"是v1.5版本,"generalv2"表示v2.0版本, "generalv3"表示v3.0版本,当前讯飞的星火就有三个版本
              role 代表角色,星火的有两个角色“user”表示是用户的问题,“assistant”表示AI的回复
    @return  :None

    '''
    def __init__(self,domain,role,content) -> None:
        self.text.clear
        self.__getText(role,content)
        SparkApi.main(self.appid,self.api_key,self.api_secret,self.domain_url[domain],domain,self.text)

        pass
    '''
    @des  :拼装成访问参数中的text需要的格式   
    @params  : role 代表角色,星火的有两个角色user表示是用户的问题,assistant表示AI的回复
               content是用户输入的问题
    @return  :None

    '''

    def __getText(self,role,content) -> None:

        jsoncon = {}
        jsoncon["role"] = role
        jsoncon["content"] = content
        self.text.append(jsoncon)
        # return self.text
        self.__checklen()


    '''
    @des  :获取这次传递给llm的prompt的长度

    @params  :None

    @return  :None

    '''
    def __getlength(self)-> None:
        length = 0
        for content in self.text:
            temp = content["content"]
            leng = len(temp)
            length += leng
        return length
    '''
    @des  :参数长度检查,如果全部的prompt的长度超过了8000,那么就删除这次拼装好的prompt

    @params  :None

    @return  :None

    '''


    def __checklen(self)-> None:
        while (self.__getlength() > 8000):
            del self.text[0]
        # return self.text
    '''
    @des  :获取LLM的反馈

    @params  :None

    @return  :string

    '''

    def response(self)-> str:
        return SparkApi.answer

SparkLLM继承LangChain的LLM

SparkLLM,继承了LangChain的LLM,参考了LangChain官方的CustomerLLM的写法。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File    :   iflytek.py
@Time    :   2023/10/27 17:28:58
@Author  :   CrissChan 
@Version :   1.0
@Site    :   https://blog.csdn.net/crisschan
@Desc    :   通过Langchain的customerLLM的方式,把讯飞的spark介入Langchain,按照Langchain的https://python.langchain.com/docs/modules/model_io/models/llms/custom_llm进行改写
'''
import logging
from typing import Any, List, Optional

from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.llms.base import LLM

from spark_middlerware import SparkMiddleware
class SparkLLM(LLM):
    #domain 代表需要调取spark的版本其中有三种值可选 "general"是v1.5版本,"generalv2"表示v2.0版本, "generalv3"表示v3.0版本,当前讯飞的星火就有三个版本
    domain :str
    @property
    def _llm_type(self) -> str:
        return "Spark"
    # @property
    # def _identifying_params(self) -> Mapping[str, Any]:
    #     """Get the identifying parameters."""
    #     _model_kwargs = self.model_kwargs or {}
    #     return {
    #         **{"endpoint_url": self.endpoint_url},
    #         **{"model_kwargs": _model_kwargs},
    #     }


    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        if stop is not None:
            raise ValueError("stop kwargs are not permitted.")
        # return prompt[: self.n]
        smw = SparkMiddleware(domain=self.domain,role='user',content=prompt)
        try:
            logging.debug("spark response :"+smw.response())
            return smw.response()
        except Exception as e:
            logging.debug(f"spark middlerware error :{e}")
            return "error"

总结

如上源代码拿来即可以用,其中需要Python的_变量以及将appid、api_secret、apikey存在项目根目录的env文件中就可以了。
如果对于
变量不熟悉可以学习:https://blog.csdn.net/crisschan/article/details/133277855?spm=1001.2014.3001.5501

PS

下载地址:https://download.csdn.net/download/chenlei_525/88496832

相关实践学习
阿里云百炼xAnalyticDB PostgreSQL构建AIGC应用
通过该实验体验在阿里云百炼中构建企业专属知识库构建及应用全流程。同时体验使用ADB-PG向量检索引擎提供专属安全存储,保障企业数据隐私安全。
AnalyticDB PostgreSQL 企业智能数据中台:一站式管理数据服务资产
企业在数据仓库之上可构建丰富的数据服务用以支持数据应用及业务场景;ADB PG推出全新企业智能数据平台,用以帮助用户一站式的管理企业数据服务资产,包括创建, 管理,探索, 监控等; 助力企业在现有平台之上快速构建起数据服务资产体系
目录
相关文章
|
30天前
|
前端开发 机器人 API
前端大模型入门(一):用 js+langchain 构建基于 LLM 的应用
本文介绍了大语言模型(LLM)的HTTP API流式调用机制及其在前端的实现方法。通过流式调用,服务器可以逐步发送生成的文本内容,前端则实时处理并展示这些数据块,从而提升用户体验和实时性。文章详细讲解了如何使用`fetch`发起流式请求、处理响应流数据、逐步更新界面、处理中断和错误,以及优化用户交互。流式调用特别适用于聊天机器人、搜索建议等应用场景,能够显著减少用户的等待时间,增强交互性。
229 2
|
3天前
|
jenkins 测试技术 持续交付
软件测试中的自动化与持续集成:提升效率与质量的关键
在快节奏的软件开发环境中,自动化测试和持续集成已经成为不可或缺的部分。本文将探讨自动化测试和持续集成的重要性,以及它们如何协同工作以提高软件开发的效率和质量。通过分析自动化测试的策略、工具选择以及持续集成的实践,我们将揭示这些技术如何帮助开发团队快速响应变化,减少错误,并加速产品上市时间。
|
2天前
|
机器学习/深度学习 人工智能 jenkins
软件测试中的自动化与持续集成实践
在快速迭代的软件开发过程中,自动化测试和持续集成(CI)是确保代码质量和加速产品上市的关键。本文探讨了自动化测试的重要性、常见的自动化测试工具以及如何将自动化测试整合到持续集成流程中,以提高软件测试的效率和可靠性。通过案例分析,展示了自动化测试和持续集成在实际项目中的应用效果,并提供了实施建议。
|
5天前
|
前端开发 JavaScript 测试技术
前端测试技术中,如何提高集成测试的效率?
前端测试技术中,如何提高集成测试的效率?
|
28天前
|
缓存 Devops jenkins
专家视角:构建可维护的测试架构与持续集成
【10月更文挑战第14天】在现代软件开发过程中,构建一个可维护且易于扩展的测试架构对于确保产品质量至关重要。本文将探讨如何设计这样的测试架构,并将单元测试无缝地融入持续集成(CI)流程之中。我们将讨论最佳实践、自动化测试部署、性能优化技巧以及如何管理和扩展日益增长的测试套件规模。
43 3
|
15天前
|
JSON 数据可视化 NoSQL
基于LLM Graph Transformer的知识图谱构建技术研究:LangChain框架下转换机制实践
本文介绍了LangChain的LLM Graph Transformer框架,探讨了文本到图谱转换的双模式实现机制。基于工具的模式利用结构化输出和函数调用,简化了提示工程并支持属性提取;基于提示的模式则为不支持工具调用的模型提供了备选方案。通过精确定义图谱模式(包括节点类型、关系类型及其约束),显著提升了提取结果的一致性和可靠性。LLM Graph Transformer为非结构化数据的结构化表示提供了可靠的技术方案,支持RAG应用和复杂查询处理。
60 2
基于LLM Graph Transformer的知识图谱构建技术研究:LangChain框架下转换机制实践
|
5天前
|
敏捷开发 Devops 测试技术
自动化测试中的持续集成与持续部署
在现代软件开发实践中,自动化测试是确保软件质量和快速迭代的关键。本文将探讨自动化测试如何与持续集成(CI)和持续部署(CD)流程相结合,以提高开发效率和软件质量。我们将分析CI/CD管道中自动化测试的最佳实践,以及如何克服实施过程中的挑战。
24 6
|
7天前
|
jenkins 测试技术 持续交付
探索软件测试中的自动化与持续集成
本文深入探讨了软件测试领域中自动化测试和持续集成的融合应用,分析了这种结合如何提升软件开发的效率和质量。通过具体案例分析,展示了自动化测试和持续集成在软件开发生命周期中的关键作用及其实施策略。
|
9天前
|
监控 安全 测试技术
构建高效的精准测试平台:设计与实现指南
在软件开发过程中,精准测试是确保产品质量和性能的关键环节。一个精准的测试平台能够自动化测试流程,提高测试效率,缩短测试周期,并提供准确的测试结果。本文将分享如何设计和实现一个精准测试平台,从需求分析到技术选型,再到具体的实现步骤。
44 1
|
17天前
|
监控 jenkins 测试技术
探索软件测试的新篇章:自动化与持续集成
【10月更文挑战第25天】在数字化时代的浪潮中,软件已成为驱动世界的核心力量。然而,随着软件复杂性的增加,传统的测试方法已无法满足快速迭代和高质量交付的需求。本文将探讨如何通过自动化测试和持续集成(CI)来提升软件开发的效率和质量,同时确保产品的稳定性和可靠性。我们将从自动化测试的基础出发,逐步深入到持续集成的实践,并展示如何通过实际案例实现这一转变。