为集成LLM到测试平台提供更便捷的方式:为讯飞的LLM星火创建接入LangChain类(全部源代码)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
实时数仓Hologres,5000CU*H 100GB 3个月
简介: 为集成LLM到测试平台提供更便捷的方式:为讯飞的LLM星火创建接入LangChain类(全部源代码)

为集成LLM到测试平台提供更便捷的方式:为讯飞的LLM星火创建接入LangChain类(全部源代码)
LangChain是一个强大的框架,旨在帮助开发人员使用语言模型构建端到端的应用程序。它提供了一套工具、组件和接口,可简化创建由大型语言模型 (LLM) 和聊天模型提供支持的应用程序的过程。LangChain 可以轻松管理与语言模型的交互,将多个组件链接在一起,并集成额外的资源,例如 API 和数据库。

讯飞已经发布了v3的模型,提供一定份额的免费API的tokens,对于尝试学习以及一些简单尝试非常友好,最近学习LangChain,发现没有讯飞星火的集成,因此自己动手写了一个。

官方的访问API

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File    :   SparkApi.py
@Time    :   2023/10/18 11:01:37
@Author  :   CrissChan 
@Version :   1.0
@Site    :   https://blog.csdn.net/crisschan
@Desc    :   
'''


import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time

import websocket  # 使用websocket_client
answer = ""

class Ws_Param(object):
    # 初始化
    def __init__(self, APPID, APIKey, APISecret, Spark_url):
        self.APPID = APPID
        self.APIKey = APIKey
        self.APISecret = APISecret
        self.host = urlparse(Spark_url).netloc
        self.path = urlparse(Spark_url).path
        self.Spark_url = Spark_url

    # 生成url
    def create_url(self):
        # 生成RFC1123格式的时间戳
        now = datetime.now()
        date = format_date_time(mktime(now.timetuple()))

        # 拼接字符串
        signature_origin = "host: " + self.host + "\n"
        signature_origin += "date: " + date + "\n"
        signature_origin += "GET " + self.path + " HTTP/1.1"

        # 进行hmac-sha256进行加密
        signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()

        signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')

        authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'

        authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')

        # 将请求的鉴权参数组合为字典
        v = {
            "authorization": authorization,
            "date": date,
            "host": self.host
        }
        # 拼接鉴权参数,生成url
        url = self.Spark_url + '?' + urlencode(v)
        # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
        return url


# 收到websocket错误的处理
def on_error(ws, error):
    print("### error:", error)


# 收到websocket关闭的处理
def on_close(ws,one,two):
    print(" ")


# 收到websocket连接建立的处理
def on_open(ws):
    thread.start_new_thread(run, (ws,))


def run(ws, *args):
    data = json.dumps(gen_params(appid=ws.appid, domain= ws.domain,question=ws.question))
    ws.send(data)


# 收到websocket消息的处理
def on_message(ws, message):
    # print(message)
    data = json.loads(message)
    # print(data)
    code = data['header']['code']
    if code != 0:
        print(f'请求错误: {code}, {data}')
        ws.close()
    else:
        choices = data["payload"]["choices"]
        status = choices["status"]
        content = choices["text"][0]["content"]
        print(content,end ="")
        # 尝试找到可以转成python对象的list结构
        # print(content)
        global answer
        answer += content

        #print(1)
        if status == 2:
            ws.close()


def gen_params(appid, domain,question):
    """
    通过appid和用户的提问来生成请参数
    """
    data = {
        "header": {
            "app_id": appid,
            "uid": "1234"
        },
        "parameter": {
            "chat": {
                "domain": domain,
                "random_threshold": 0.5,
                "temperature": 0.5,
                "max_tokens": 2048,
                "auditing": "default"
            }
        },
        "payload": {
            "message": {
                "text": question
            }
        }
    }
    return data


def main(appid, api_key, api_secret, Spark_url,domain, question):
    # print("星火:")
    wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
    # websocket.enableTrace(False)
    wsUrl = wsParam.create_url()
    ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
    ws.appid = appid
    ws.question = question
    ws.domain = domain
    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})

SparkAPI和LangChain中间的一个调用层

下面代码是一个SparkAPI和LangChain的调用层,方便更好封装Spark的参数。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File    :   spark_middlerware.py
@Time    :   2023/11/01 18:44:52
@Author  :   CrissChan 
@Version :   1.0
@Site    :   https://blog.csdn.net/crisschan
@Desc    :   链接sparkapi的中间件,中间控制版本,token上限等
'''



import SparkApi
import os
from dotenv import load_dotenv, find_dotenv

#以下密钥信息从控制台获取
class SparkMiddleware(object):

    _=load_dotenv(find_dotenv())
    appid = os.getenv("SPARK_APP_ID")
    api_secret=os.getenv("SPARK_APP_SECRET")
    api_key=os.getenv("SPARK_APP_KEY")


    #用于配置大模型版本,默认“general/generalv2”
    # domain = "general"   # v1.5版本
    # domain = "generalv2"    # v2.0版本
    #domain = "generalv3"    # v3.0版本
    #云端环境的服务地址
    # Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat"  # v1.5环境的地址
    # Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat"  # v2.0环境的地址ws(s)://spark-api.xf-yun.com/v2.1/chat
    # Spark_url = "ws://spark-api.xf-yun.com/v3.1/chat"  # v3.0环境的地址ws(s)://spark-api.xf-yun.com/v3.1/chat
    # 定义了sparkdomain和url的dict,这样在输入的时候就可以自动匹配对应的版本地址
    domain_url = {"general":"ws://spark-api.xf-yun.com/v1.1/chat",
                  "generalv2":"ws://spark-api.xf-yun.com/v2.1/chat",
                  "generalv3":"ws://spark-api.xf-yun.com/v3.1/chat",
                  }

    text =[]

    '''
    @des     :spark middlerware的构造函数,创建一个和封装sparkapi调用的参数的中间层   
    @params  : 
              domain 代表需要调取spark的版本其中有三种值可选 "general"是v1.5版本,"generalv2"表示v2.0版本, "generalv3"表示v3.0版本,当前讯飞的星火就有三个版本
              role 代表角色,星火的有两个角色“user”表示是用户的问题,“assistant”表示AI的回复
    @return  :None

    '''
    def __init__(self,domain,role,content) -> None:
        self.text.clear
        self.__getText(role,content)
        SparkApi.main(self.appid,self.api_key,self.api_secret,self.domain_url[domain],domain,self.text)

        pass
    '''
    @des  :拼装成访问参数中的text需要的格式   
    @params  : role 代表角色,星火的有两个角色user表示是用户的问题,assistant表示AI的回复
               content是用户输入的问题
    @return  :None

    '''

    def __getText(self,role,content) -> None:

        jsoncon = {}
        jsoncon["role"] = role
        jsoncon["content"] = content
        self.text.append(jsoncon)
        # return self.text
        self.__checklen()


    '''
    @des  :获取这次传递给llm的prompt的长度

    @params  :None

    @return  :None

    '''
    def __getlength(self)-> None:
        length = 0
        for content in self.text:
            temp = content["content"]
            leng = len(temp)
            length += leng
        return length
    '''
    @des  :参数长度检查,如果全部的prompt的长度超过了8000,那么就删除这次拼装好的prompt

    @params  :None

    @return  :None

    '''


    def __checklen(self)-> None:
        while (self.__getlength() > 8000):
            del self.text[0]
        # return self.text
    '''
    @des  :获取LLM的反馈

    @params  :None

    @return  :string

    '''

    def response(self)-> str:
        return SparkApi.answer

SparkLLM继承LangChain的LLM

SparkLLM,继承了LangChain的LLM,参考了LangChain官方的CustomerLLM的写法。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File    :   iflytek.py
@Time    :   2023/10/27 17:28:58
@Author  :   CrissChan 
@Version :   1.0
@Site    :   https://blog.csdn.net/crisschan
@Desc    :   通过Langchain的customerLLM的方式,把讯飞的spark介入Langchain,按照Langchain的https://python.langchain.com/docs/modules/model_io/models/llms/custom_llm进行改写
'''
import logging
from typing import Any, List, Optional

from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.llms.base import LLM

from spark_middlerware import SparkMiddleware
class SparkLLM(LLM):
    #domain 代表需要调取spark的版本其中有三种值可选 "general"是v1.5版本,"generalv2"表示v2.0版本, "generalv3"表示v3.0版本,当前讯飞的星火就有三个版本
    domain :str
    @property
    def _llm_type(self) -> str:
        return "Spark"
    # @property
    # def _identifying_params(self) -> Mapping[str, Any]:
    #     """Get the identifying parameters."""
    #     _model_kwargs = self.model_kwargs or {}
    #     return {
    #         **{"endpoint_url": self.endpoint_url},
    #         **{"model_kwargs": _model_kwargs},
    #     }


    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        if stop is not None:
            raise ValueError("stop kwargs are not permitted.")
        # return prompt[: self.n]
        smw = SparkMiddleware(domain=self.domain,role='user',content=prompt)
        try:
            logging.debug("spark response :"+smw.response())
            return smw.response()
        except Exception as e:
            logging.debug(f"spark middlerware error :{e}")
            return "error"

总结

如上源代码拿来即可以用,其中需要Python的_变量以及将appid、api_secret、apikey存在项目根目录的env文件中就可以了。
如果对于
变量不熟悉可以学习:https://blog.csdn.net/crisschan/article/details/133277855?spm=1001.2014.3001.5501

PS

下载地址:https://download.csdn.net/download/chenlei_525/88496832

相关实践学习
阿里云百炼xAnalyticDB PostgreSQL构建AIGC应用
通过该实验体验在阿里云百炼中构建企业专属知识库构建及应用全流程。同时体验使用ADB-PG向量检索引擎提供专属安全存储,保障企业数据隐私安全。
AnalyticDB PostgreSQL 企业智能数据中台:一站式管理数据服务资产
企业在数据仓库之上可构建丰富的数据服务用以支持数据应用及业务场景;ADB PG推出全新企业智能数据平台,用以帮助用户一站式的管理企业数据服务资产,包括创建, 管理,探索, 监控等; 助力企业在现有平台之上快速构建起数据服务资产体系
目录
相关文章
|
2月前
|
人工智能 网络协议 Java
RuoYi AI:1人搞定AI中台!开源全栈式AI开发平台,快速集成大模型+RAG+支付等模块
RuoYi AI 是一个全栈式 AI 开发平台,支持本地 RAG 方案,集成多种大语言模型和多媒体功能,适合企业和个人开发者快速搭建个性化 AI 应用。
990 21
RuoYi AI:1人搞定AI中台!开源全栈式AI开发平台,快速集成大模型+RAG+支付等模块
|
14天前
|
关系型数据库 大数据 MySQL
【能力比对】数据集成管理VS数据集成平台VS数据同步平台
杭州奥零数据科技有限公司成立于2023年,专注于数据中台业务,维护开源项目AllData并提供商业版解决方案。AllData提供数据集成、存储、开发、治理及BI展示等一站式服务,支持AI大模型应用,助力企业高效利用数据价值。
【能力比对】数据集成管理VS数据集成平台VS数据同步平台
|
20天前
|
存储 人工智能 监控
通过Milvus和Langchain快速构建基于百炼大模型的LLM问答系统
阿里云向量检索服务Milvus版是一款全托管向量检索引擎,并确保与开源Milvus的完全兼容性,支持无缝迁移。它在开源版本的基础上增强了可扩展性,能提供大规模AI向量数据的相似性检索服务。凭借其开箱即用的特性、灵活的扩展能力和全链路监控告警,Milvus云服务成为多样化AI应用场景的理想选择,包括多模态搜索、检索增强生成(RAG)、搜索推荐、内容风险识别等。您还可以利用开源的Attu工具进行可视化操作,进一步促进应用的快速开发和部署。
|
1月前
|
人工智能 自然语言处理 数据可视化
89.4K star!这个开源LLM应用开发平台,让你轻松构建AI工作流!
Dify 是一款开源的 LLM 应用开发平台,通过直观的可视化界面整合 AI 工作流、RAG 管道、智能代理等功能,助你快速实现从原型到生产的跨越。支持本地部署和云端服务,提供企业级功能与完整 API 接口。
|
2月前
|
人工智能 自然语言处理 文字识别
解读 | 金融长上下文基准测试FailSafeQA:解锁金融领域LLM真实的审慎性和容错性
近年来,大型语言模型(LLMs)在金融领域的应用如火如荼,从风险分析到客户服务,它们正逐步改变行业的游戏规则。然而,这些模型是否真的足够“靠谱”?面对复杂的金融数据和多变的用户输入,它们还能保持精准和稳健吗?
97 8
解读 | 金融长上下文基准测试FailSafeQA:解锁金融领域LLM真实的审慎性和容错性
|
2月前
|
SQL 关系型数据库 MySQL
【亲测有用】数据集成平台能力演示(支持国产数据库DaMeng与KingBase)
杭州奥零数据科技有限公司成立于2023年,专注于数据中台业务,维护开源项目AllData并提供商业版解决方案。AllData提供数据集成、存储、开发、治理及BI展示等一站式服务,支持AI大模型应用,助力企业高效利用数据价值。
【亲测有用】数据集成平台能力演示(支持国产数据库DaMeng与KingBase)
|
3月前
|
人工智能 安全 机器人
LangBot:无缝集成到QQ、微信等消息平台的AI聊天机器人平台
LangBot 是一个开源的多模态即时聊天机器人平台,支持多种即时通信平台和大语言模型,具备多模态交互、插件扩展和Web管理面板等功能。
839 14
LangBot:无缝集成到QQ、微信等消息平台的AI聊天机器人平台
|
4月前
|
NoSQL 大数据 关系型数据库
AllData数据中台核心菜单十一:数据集成平台
杭州奥零数据科技有限公司成立于2023年,专注于数据中台业务,维护开源项目AllData并提供商业版解决方案。AllData提供数据集成、存储、开发、治理及BI展示等一站式服务,支持AI大模型应用,助力企业高效利用数据价值。
AllData数据中台核心菜单十一:数据集成平台
|
3月前
|
存储 人工智能 NoSQL
Airweave:快速集成应用数据打造AI知识库的开源平台,支持多源整合和自动同步数据
Airweave 是一个开源工具,能够将应用程序的数据同步到图数据库和向量数据库中,实现智能代理检索。它支持无代码集成、多租户支持和自动同步等功能。
192 14
|
4月前
|
人工智能 数据可视化 开发者
FlowiseAI:34K Star!集成多种模型和100+组件的 LLM 应用低代码开发平台,拖拽组件轻松构建程序
FlowiseAI 是一款开源的低代码工具,通过拖拽可视化组件,用户可以快速构建自定义的 LLM 应用程序,支持多模型集成和记忆功能。
322 14
FlowiseAI:34K Star!集成多种模型和100+组件的 LLM 应用低代码开发平台,拖拽组件轻松构建程序