为集成LLM到测试平台提供更便捷的方式:为讯飞的LLM星火创建接入LangChain类(全部源代码)

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
简介: 为集成LLM到测试平台提供更便捷的方式:为讯飞的LLM星火创建接入LangChain类(全部源代码)

为集成LLM到测试平台提供更便捷的方式:为讯飞的LLM星火创建接入LangChain类(全部源代码)
LangChain是一个强大的框架,旨在帮助开发人员使用语言模型构建端到端的应用程序。它提供了一套工具、组件和接口,可简化创建由大型语言模型 (LLM) 和聊天模型提供支持的应用程序的过程。LangChain 可以轻松管理与语言模型的交互,将多个组件链接在一起,并集成额外的资源,例如 API 和数据库。

讯飞已经发布了v3的模型,提供一定份额的免费API的tokens,对于尝试学习以及一些简单尝试非常友好,最近学习LangChain,发现没有讯飞星火的集成,因此自己动手写了一个。

官方的访问API

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File    :   SparkApi.py
@Time    :   2023/10/18 11:01:37
@Author  :   CrissChan 
@Version :   1.0
@Site    :   https://blog.csdn.net/crisschan
@Desc    :   
'''


import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time

import websocket  # 使用websocket_client
answer = ""

class Ws_Param(object):
    # 初始化
    def __init__(self, APPID, APIKey, APISecret, Spark_url):
        self.APPID = APPID
        self.APIKey = APIKey
        self.APISecret = APISecret
        self.host = urlparse(Spark_url).netloc
        self.path = urlparse(Spark_url).path
        self.Spark_url = Spark_url

    # 生成url
    def create_url(self):
        # 生成RFC1123格式的时间戳
        now = datetime.now()
        date = format_date_time(mktime(now.timetuple()))

        # 拼接字符串
        signature_origin = "host: " + self.host + "\n"
        signature_origin += "date: " + date + "\n"
        signature_origin += "GET " + self.path + " HTTP/1.1"

        # 进行hmac-sha256进行加密
        signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()

        signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')

        authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'

        authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')

        # 将请求的鉴权参数组合为字典
        v = {
            "authorization": authorization,
            "date": date,
            "host": self.host
        }
        # 拼接鉴权参数,生成url
        url = self.Spark_url + '?' + urlencode(v)
        # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
        return url


# 收到websocket错误的处理
def on_error(ws, error):
    print("### error:", error)


# 收到websocket关闭的处理
def on_close(ws,one,two):
    print(" ")


# 收到websocket连接建立的处理
def on_open(ws):
    thread.start_new_thread(run, (ws,))


def run(ws, *args):
    data = json.dumps(gen_params(appid=ws.appid, domain= ws.domain,question=ws.question))
    ws.send(data)


# 收到websocket消息的处理
def on_message(ws, message):
    # print(message)
    data = json.loads(message)
    # print(data)
    code = data['header']['code']
    if code != 0:
        print(f'请求错误: {code}, {data}')
        ws.close()
    else:
        choices = data["payload"]["choices"]
        status = choices["status"]
        content = choices["text"][0]["content"]
        print(content,end ="")
        # 尝试找到可以转成python对象的list结构
        # print(content)
        global answer
        answer += content

        #print(1)
        if status == 2:
            ws.close()


def gen_params(appid, domain,question):
    """
    通过appid和用户的提问来生成请参数
    """
    data = {
        "header": {
            "app_id": appid,
            "uid": "1234"
        },
        "parameter": {
            "chat": {
                "domain": domain,
                "random_threshold": 0.5,
                "temperature": 0.5,
                "max_tokens": 2048,
                "auditing": "default"
            }
        },
        "payload": {
            "message": {
                "text": question
            }
        }
    }
    return data


def main(appid, api_key, api_secret, Spark_url,domain, question):
    # print("星火:")
    wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
    # websocket.enableTrace(False)
    wsUrl = wsParam.create_url()
    ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
    ws.appid = appid
    ws.question = question
    ws.domain = domain
    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})

SparkAPI和LangChain中间的一个调用层

下面代码是一个SparkAPI和LangChain的调用层,方便更好封装Spark的参数。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File    :   spark_middlerware.py
@Time    :   2023/11/01 18:44:52
@Author  :   CrissChan 
@Version :   1.0
@Site    :   https://blog.csdn.net/crisschan
@Desc    :   链接sparkapi的中间件,中间控制版本,token上限等
'''



import SparkApi
import os
from dotenv import load_dotenv, find_dotenv

#以下密钥信息从控制台获取
class SparkMiddleware(object):

    _=load_dotenv(find_dotenv())
    appid = os.getenv("SPARK_APP_ID")
    api_secret=os.getenv("SPARK_APP_SECRET")
    api_key=os.getenv("SPARK_APP_KEY")


    #用于配置大模型版本,默认“general/generalv2”
    # domain = "general"   # v1.5版本
    # domain = "generalv2"    # v2.0版本
    #domain = "generalv3"    # v3.0版本
    #云端环境的服务地址
    # Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat"  # v1.5环境的地址
    # Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat"  # v2.0环境的地址ws(s)://spark-api.xf-yun.com/v2.1/chat
    # Spark_url = "ws://spark-api.xf-yun.com/v3.1/chat"  # v3.0环境的地址ws(s)://spark-api.xf-yun.com/v3.1/chat
    # 定义了sparkdomain和url的dict,这样在输入的时候就可以自动匹配对应的版本地址
    domain_url = {"general":"ws://spark-api.xf-yun.com/v1.1/chat",
                  "generalv2":"ws://spark-api.xf-yun.com/v2.1/chat",
                  "generalv3":"ws://spark-api.xf-yun.com/v3.1/chat",
                  }

    text =[]

    '''
    @des     :spark middlerware的构造函数,创建一个和封装sparkapi调用的参数的中间层   
    @params  : 
              domain 代表需要调取spark的版本其中有三种值可选 "general"是v1.5版本,"generalv2"表示v2.0版本, "generalv3"表示v3.0版本,当前讯飞的星火就有三个版本
              role 代表角色,星火的有两个角色“user”表示是用户的问题,“assistant”表示AI的回复
    @return  :None

    '''
    def __init__(self,domain,role,content) -> None:
        self.text.clear
        self.__getText(role,content)
        SparkApi.main(self.appid,self.api_key,self.api_secret,self.domain_url[domain],domain,self.text)

        pass
    '''
    @des  :拼装成访问参数中的text需要的格式   
    @params  : role 代表角色,星火的有两个角色user表示是用户的问题,assistant表示AI的回复
               content是用户输入的问题
    @return  :None

    '''

    def __getText(self,role,content) -> None:

        jsoncon = {}
        jsoncon["role"] = role
        jsoncon["content"] = content
        self.text.append(jsoncon)
        # return self.text
        self.__checklen()


    '''
    @des  :获取这次传递给llm的prompt的长度

    @params  :None

    @return  :None

    '''
    def __getlength(self)-> None:
        length = 0
        for content in self.text:
            temp = content["content"]
            leng = len(temp)
            length += leng
        return length
    '''
    @des  :参数长度检查,如果全部的prompt的长度超过了8000,那么就删除这次拼装好的prompt

    @params  :None

    @return  :None

    '''


    def __checklen(self)-> None:
        while (self.__getlength() > 8000):
            del self.text[0]
        # return self.text
    '''
    @des  :获取LLM的反馈

    @params  :None

    @return  :string

    '''

    def response(self)-> str:
        return SparkApi.answer

SparkLLM继承LangChain的LLM

SparkLLM,继承了LangChain的LLM,参考了LangChain官方的CustomerLLM的写法。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File    :   iflytek.py
@Time    :   2023/10/27 17:28:58
@Author  :   CrissChan 
@Version :   1.0
@Site    :   https://blog.csdn.net/crisschan
@Desc    :   通过Langchain的customerLLM的方式,把讯飞的spark介入Langchain,按照Langchain的https://python.langchain.com/docs/modules/model_io/models/llms/custom_llm进行改写
'''
import logging
from typing import Any, List, Optional

from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.llms.base import LLM

from spark_middlerware import SparkMiddleware
class SparkLLM(LLM):
    #domain 代表需要调取spark的版本其中有三种值可选 "general"是v1.5版本,"generalv2"表示v2.0版本, "generalv3"表示v3.0版本,当前讯飞的星火就有三个版本
    domain :str
    @property
    def _llm_type(self) -> str:
        return "Spark"
    # @property
    # def _identifying_params(self) -> Mapping[str, Any]:
    #     """Get the identifying parameters."""
    #     _model_kwargs = self.model_kwargs or {}
    #     return {
    #         **{"endpoint_url": self.endpoint_url},
    #         **{"model_kwargs": _model_kwargs},
    #     }


    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        if stop is not None:
            raise ValueError("stop kwargs are not permitted.")
        # return prompt[: self.n]
        smw = SparkMiddleware(domain=self.domain,role='user',content=prompt)
        try:
            logging.debug("spark response :"+smw.response())
            return smw.response()
        except Exception as e:
            logging.debug(f"spark middlerware error :{e}")
            return "error"

总结

如上源代码拿来即可以用,其中需要Python的_变量以及将appid、api_secret、apikey存在项目根目录的env文件中就可以了。
如果对于
变量不熟悉可以学习:https://blog.csdn.net/crisschan/article/details/133277855?spm=1001.2014.3001.5501

PS

下载地址:https://download.csdn.net/download/chenlei_525/88496832

相关实践学习
阿里云百炼xAnalyticDB PostgreSQL构建AIGC应用
通过该实验体验在阿里云百炼中构建企业专属知识库构建及应用全流程。同时体验使用ADB-PG向量检索引擎提供专属安全存储,保障企业数据隐私安全。
AnalyticDB PostgreSQL 企业智能数据中台:一站式管理数据服务资产
企业在数据仓库之上可构建丰富的数据服务用以支持数据应用及业务场景;ADB PG推出全新企业智能数据平台,用以帮助用户一站式的管理企业数据服务资产,包括创建, 管理,探索, 监控等; 助力企业在现有平台之上快速构建起数据服务资产体系
目录
相关文章
|
1月前
|
人工智能 数据可视化 开发者
FlowiseAI:34K Star!集成多种模型和100+组件的 LLM 应用低代码开发平台,拖拽组件轻松构建程序
FlowiseAI 是一款开源的低代码工具,通过拖拽可视化组件,用户可以快速构建自定义的 LLM 应用程序,支持多模型集成和记忆功能。
116 14
FlowiseAI:34K Star!集成多种模型和100+组件的 LLM 应用低代码开发平台,拖拽组件轻松构建程序
|
2月前
|
机器学习/深度学习 人工智能 jenkins
探索软件测试中的自动化与持续集成
【10月更文挑战第21天】 在软件开发的生命周期中,软件测试扮演着至关重要的角色。随着技术的进步和开发模式的转变,自动化测试和持续集成已经成为提高软件质量和效率的关键手段。本文将深入探讨自动化测试和持续集成的概念、实施策略以及它们如何相互配合以优化软件开发流程。我们将通过分析实际案例,展示这些技术如何在实际项目中发挥作用,以及面临的挑战和解决方案。此外,文章还将讨论未来趋势,包括人工智能在测试领域的应用前景。
97 17
|
2月前
|
弹性计算 自然语言处理 数据库
通过阿里云Milvus和LangChain快速构建LLM问答系统
本文介绍如何通过整合阿里云Milvus、阿里云DashScope Embedding模型与阿里云PAI(EAS)模型服务,构建一个由LLM(大型语言模型)驱动的问题解答应用,并着重演示了如何搭建基于这些技术的RAG对话系统。
|
2月前
|
人工智能 自然语言处理 前端开发
CodeArena:在线 LLM 编程竞技场!用于测试不同开源 LLM 的编程能力,实时更新排行榜
CodeArena 是一个在线平台,用于测试和比较不同大型语言模型(LLM)的编程能力。通过实时显示多个 LLM 的代码生成过程和结果,帮助开发者选择适合的 LLM,并推动 LLM 技术的发展。
85 7
CodeArena:在线 LLM 编程竞技场!用于测试不同开源 LLM 的编程能力,实时更新排行榜
|
2月前
|
jenkins 测试技术 持续交付
软件测试中的自动化与持续集成
在现代软件开发过程中,自动化测试和持续集成已成为不可或缺的组成部分。本文将深入探讨自动化测试和持续集成的重要性、优势以及如何有效实施它们以提升软件质量和开发效率。通过具体案例分析,我们将展示这些技术如何在实际项目中发挥作用,并讨论其面临的挑战及应对策略。
80 3
|
3月前
|
Devops 测试技术 持续交付
软件测试中的自动化与持续集成:最佳实践与挑战
在快速迭代的软件开发周期中,自动化测试和持续集成(CI)已成为提高软件质量和加速产品上市的关键策略。本文探讨了自动化测试和CI的实施如何帮助开发团队提前发现缺陷、缩短反馈循环,并确保代码质量。我们将深入分析自动化测试的策略选择、工具应用以及面临的挑战,同时提供一些克服这些挑战的最佳实践。
88 0
|
3月前
|
监控 jenkins 测试技术
探索软件测试中的自动化与持续集成####
本文旨在探讨软件测试中自动化测试与持续集成(CI)的融合实践,分析其对提升软件开发效率和质量的重要性。通过深入剖析自动化测试的优势、持续集成的核心概念以及两者结合的最佳实践案例,揭示这一技术趋势如何重塑现代软件开发流程。文章还将讨论实施过程中的挑战和应对策略,为读者提供一套实用的方法论指导。 ####
|
3月前
|
监控 jenkins 测试技术
探索软件测试中的自动化与持续集成
在快速迭代的软件开发周期中,自动化测试和持续集成已成为保证软件质量的关键因素。本文将深入探讨自动化测试的重要性、持续集成的概念及其实践方法,并分析它们如何共同作用于提升软件开发效率和产品质量。通过实际案例分析,我们将了解这些技术是如何在不同的开发环境中被应用,以及它们带来的具体益处。此外,文章还将讨论实施自动化测试和持续集成过程中可能遇到的挑战及解决方案,为读者提供实用的指导和建议。
59 0
|
3月前
|
jenkins 测试技术 持续交付
探索自动化测试在持续集成中的应用与挑战
本文深入探讨了自动化测试在现代软件开发流程,特别是持续集成(CI)环境中的关键作用。通过分析自动化测试的优势、实施策略以及面临的主要挑战,旨在为开发团队提供实用的指导和建议。文章不仅概述了自动化测试的基本原理和最佳实践,还详细讨论了如何克服实施过程中遇到的技术难题和管理障碍,以实现更高效、更可靠的软件交付。
|
3月前
|
敏捷开发 测试技术 持续交付
探索软件测试中的自动化与持续集成
在快速迭代的软件开发环境中,自动化测试和持续集成(CI)已成为确保产品质量和加速交付的关键策略。本文将深入探讨自动化测试的基本原理、实施步骤以及它如何与持续集成流程相结合,以提高软件开发的效率和可靠性。我们将通过实际案例分析,展示自动化测试和CI的最佳实践,以及它们如何帮助企业实现更快的市场响应时间和更高的客户满意度。
80 16