为集成LLM到测试平台提供更便捷的方式:为讯飞的LLM星火创建接入LangChain类(全部源代码)

本文涉及的产品
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
简介: 为集成LLM到测试平台提供更便捷的方式:为讯飞的LLM星火创建接入LangChain类(全部源代码)

为集成LLM到测试平台提供更便捷的方式:为讯飞的LLM星火创建接入LangChain类(全部源代码)
LangChain是一个强大的框架,旨在帮助开发人员使用语言模型构建端到端的应用程序。它提供了一套工具、组件和接口,可简化创建由大型语言模型 (LLM) 和聊天模型提供支持的应用程序的过程。LangChain 可以轻松管理与语言模型的交互,将多个组件链接在一起,并集成额外的资源,例如 API 和数据库。

讯飞已经发布了v3的模型,提供一定份额的免费API的tokens,对于尝试学习以及一些简单尝试非常友好,最近学习LangChain,发现没有讯飞星火的集成,因此自己动手写了一个。

官方的访问API

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File    :   SparkApi.py
@Time    :   2023/10/18 11:01:37
@Author  :   CrissChan 
@Version :   1.0
@Site    :   https://blog.csdn.net/crisschan
@Desc    :   
'''


import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time

import websocket  # 使用websocket_client
answer = ""

class Ws_Param(object):
    # 初始化
    def __init__(self, APPID, APIKey, APISecret, Spark_url):
        self.APPID = APPID
        self.APIKey = APIKey
        self.APISecret = APISecret
        self.host = urlparse(Spark_url).netloc
        self.path = urlparse(Spark_url).path
        self.Spark_url = Spark_url

    # 生成url
    def create_url(self):
        # 生成RFC1123格式的时间戳
        now = datetime.now()
        date = format_date_time(mktime(now.timetuple()))

        # 拼接字符串
        signature_origin = "host: " + self.host + "\n"
        signature_origin += "date: " + date + "\n"
        signature_origin += "GET " + self.path + " HTTP/1.1"

        # 进行hmac-sha256进行加密
        signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()

        signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')

        authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'

        authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')

        # 将请求的鉴权参数组合为字典
        v = {
            "authorization": authorization,
            "date": date,
            "host": self.host
        }
        # 拼接鉴权参数,生成url
        url = self.Spark_url + '?' + urlencode(v)
        # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
        return url


# 收到websocket错误的处理
def on_error(ws, error):
    print("### error:", error)


# 收到websocket关闭的处理
def on_close(ws,one,two):
    print(" ")


# 收到websocket连接建立的处理
def on_open(ws):
    thread.start_new_thread(run, (ws,))


def run(ws, *args):
    data = json.dumps(gen_params(appid=ws.appid, domain= ws.domain,question=ws.question))
    ws.send(data)


# 收到websocket消息的处理
def on_message(ws, message):
    # print(message)
    data = json.loads(message)
    # print(data)
    code = data['header']['code']
    if code != 0:
        print(f'请求错误: {code}, {data}')
        ws.close()
    else:
        choices = data["payload"]["choices"]
        status = choices["status"]
        content = choices["text"][0]["content"]
        print(content,end ="")
        # 尝试找到可以转成python对象的list结构
        # print(content)
        global answer
        answer += content

        #print(1)
        if status == 2:
            ws.close()


def gen_params(appid, domain,question):
    """
    通过appid和用户的提问来生成请参数
    """
    data = {
        "header": {
            "app_id": appid,
            "uid": "1234"
        },
        "parameter": {
            "chat": {
                "domain": domain,
                "random_threshold": 0.5,
                "temperature": 0.5,
                "max_tokens": 2048,
                "auditing": "default"
            }
        },
        "payload": {
            "message": {
                "text": question
            }
        }
    }
    return data


def main(appid, api_key, api_secret, Spark_url,domain, question):
    # print("星火:")
    wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
    # websocket.enableTrace(False)
    wsUrl = wsParam.create_url()
    ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
    ws.appid = appid
    ws.question = question
    ws.domain = domain
    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})

SparkAPI和LangChain中间的一个调用层

下面代码是一个SparkAPI和LangChain的调用层,方便更好封装Spark的参数。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File    :   spark_middlerware.py
@Time    :   2023/11/01 18:44:52
@Author  :   CrissChan 
@Version :   1.0
@Site    :   https://blog.csdn.net/crisschan
@Desc    :   链接sparkapi的中间件,中间控制版本,token上限等
'''



import SparkApi
import os
from dotenv import load_dotenv, find_dotenv

#以下密钥信息从控制台获取
class SparkMiddleware(object):

    _=load_dotenv(find_dotenv())
    appid = os.getenv("SPARK_APP_ID")
    api_secret=os.getenv("SPARK_APP_SECRET")
    api_key=os.getenv("SPARK_APP_KEY")


    #用于配置大模型版本,默认“general/generalv2”
    # domain = "general"   # v1.5版本
    # domain = "generalv2"    # v2.0版本
    #domain = "generalv3"    # v3.0版本
    #云端环境的服务地址
    # Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat"  # v1.5环境的地址
    # Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat"  # v2.0环境的地址ws(s)://spark-api.xf-yun.com/v2.1/chat
    # Spark_url = "ws://spark-api.xf-yun.com/v3.1/chat"  # v3.0环境的地址ws(s)://spark-api.xf-yun.com/v3.1/chat
    # 定义了sparkdomain和url的dict,这样在输入的时候就可以自动匹配对应的版本地址
    domain_url = {"general":"ws://spark-api.xf-yun.com/v1.1/chat",
                  "generalv2":"ws://spark-api.xf-yun.com/v2.1/chat",
                  "generalv3":"ws://spark-api.xf-yun.com/v3.1/chat",
                  }

    text =[]

    '''
    @des     :spark middlerware的构造函数,创建一个和封装sparkapi调用的参数的中间层   
    @params  : 
              domain 代表需要调取spark的版本其中有三种值可选 "general"是v1.5版本,"generalv2"表示v2.0版本, "generalv3"表示v3.0版本,当前讯飞的星火就有三个版本
              role 代表角色,星火的有两个角色“user”表示是用户的问题,“assistant”表示AI的回复
    @return  :None

    '''
    def __init__(self,domain,role,content) -> None:
        self.text.clear
        self.__getText(role,content)
        SparkApi.main(self.appid,self.api_key,self.api_secret,self.domain_url[domain],domain,self.text)

        pass
    '''
    @des  :拼装成访问参数中的text需要的格式   
    @params  : role 代表角色,星火的有两个角色user表示是用户的问题,assistant表示AI的回复
               content是用户输入的问题
    @return  :None

    '''

    def __getText(self,role,content) -> None:

        jsoncon = {}
        jsoncon["role"] = role
        jsoncon["content"] = content
        self.text.append(jsoncon)
        # return self.text
        self.__checklen()


    '''
    @des  :获取这次传递给llm的prompt的长度

    @params  :None

    @return  :None

    '''
    def __getlength(self)-> None:
        length = 0
        for content in self.text:
            temp = content["content"]
            leng = len(temp)
            length += leng
        return length
    '''
    @des  :参数长度检查,如果全部的prompt的长度超过了8000,那么就删除这次拼装好的prompt

    @params  :None

    @return  :None

    '''


    def __checklen(self)-> None:
        while (self.__getlength() > 8000):
            del self.text[0]
        # return self.text
    '''
    @des  :获取LLM的反馈

    @params  :None

    @return  :string

    '''

    def response(self)-> str:
        return SparkApi.answer

SparkLLM继承LangChain的LLM

SparkLLM,继承了LangChain的LLM,参考了LangChain官方的CustomerLLM的写法。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File    :   iflytek.py
@Time    :   2023/10/27 17:28:58
@Author  :   CrissChan 
@Version :   1.0
@Site    :   https://blog.csdn.net/crisschan
@Desc    :   通过Langchain的customerLLM的方式,把讯飞的spark介入Langchain,按照Langchain的https://python.langchain.com/docs/modules/model_io/models/llms/custom_llm进行改写
'''
import logging
from typing import Any, List, Optional

from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.llms.base import LLM

from spark_middlerware import SparkMiddleware
class SparkLLM(LLM):
    #domain 代表需要调取spark的版本其中有三种值可选 "general"是v1.5版本,"generalv2"表示v2.0版本, "generalv3"表示v3.0版本,当前讯飞的星火就有三个版本
    domain :str
    @property
    def _llm_type(self) -> str:
        return "Spark"
    # @property
    # def _identifying_params(self) -> Mapping[str, Any]:
    #     """Get the identifying parameters."""
    #     _model_kwargs = self.model_kwargs or {}
    #     return {
    #         **{"endpoint_url": self.endpoint_url},
    #         **{"model_kwargs": _model_kwargs},
    #     }


    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        if stop is not None:
            raise ValueError("stop kwargs are not permitted.")
        # return prompt[: self.n]
        smw = SparkMiddleware(domain=self.domain,role='user',content=prompt)
        try:
            logging.debug("spark response :"+smw.response())
            return smw.response()
        except Exception as e:
            logging.debug(f"spark middlerware error :{e}")
            return "error"

总结

如上源代码拿来即可以用,其中需要Python的_变量以及将appid、api_secret、apikey存在项目根目录的env文件中就可以了。
如果对于
变量不熟悉可以学习:https://blog.csdn.net/crisschan/article/details/133277855?spm=1001.2014.3001.5501

PS

下载地址:https://download.csdn.net/download/chenlei_525/88496832

相关实践学习
AnalyticDB PostgreSQL 企业智能数据中台:一站式管理数据服务资产
企业在数据仓库之上可构建丰富的数据服务用以支持数据应用及业务场景;ADB PG推出全新企业智能数据平台,用以帮助用户一站式的管理企业数据服务资产,包括创建, 管理,探索, 监控等; 助力企业在现有平台之上快速构建起数据服务资产体系
目录
相关文章
|
6月前
|
云安全 人工智能 安全
Dify平台集成阿里云AI安全护栏,构建AI Runtime安全防线
阿里云 AI 安全护栏加入Dify平台,打造可信赖的 AI
3558 166
|
10月前
|
运维 安全 关系型数据库
【产品升级】Dataphin V5.1版本发布:跨云数据集成、指标管理、平台运维带来重大更新!
V5.1版本新增多项功能:对接AWS生态(支持Amazon EMR、Redshift等),强化研发技术支撑(如API认证升级、全量任务隔离),完善运营消费链路(新增业务指标管理、指标关系图),提升平台综合能力(自定义菜单、缩短升级停机时间)。这些功能助力企业实现高效数据治理与分析,未来还将拓展智能化与国际化支持。
509 0
|
6月前
|
存储 Prometheus 监控
136_生产监控:Prometheus集成 - 设置警报与指标选择与LLM部署监控最佳实践
在大语言模型(LLM)部署的生产环境中,有效的监控系统是确保服务稳定性、可靠性和性能的关键。随着LLM模型规模的不断扩大和应用场景的日益复杂,传统的监控手段已难以满足需求。Prometheus作为当前最流行的开源监控系统之一,凭借其强大的时序数据收集、查询和告警能力,已成为LLM部署监控的首选工具。
736 6
|
7月前
|
人工智能 安全 API
Dify平台集成安全护栏最佳实践
Dify平台提供低代码构建AI大模型应用的解决方案,支持云服务与私有化部署。本文介绍了在工作流和Agent中集成安全护栏的最佳实践,包括插件和扩展API两种方案。插件方式适用于工作流,一键安装实现输入输出防控;扩展API方式适用于Agent和工作流私有化部署场景,通过本地服务适配安全护栏API。文中还详细说明了操作步骤、前提条件及常见问题处理方法,帮助用户快速实现内容安全控制。
|
9月前
|
人工智能 搜索推荐 API
AI-Compass DeepSearch深度搜索生态:集成阿里ZeroSearch、字节DeerFlow、MindSearch等前沿平台,实现超越传统关键词匹配的智能信息检索革命
AI-Compass DeepSearch深度搜索生态:集成阿里ZeroSearch、字节DeerFlow、MindSearch等前沿平台,实现超越传统关键词匹配的智能信息检索革命
AI-Compass DeepSearch深度搜索生态:集成阿里ZeroSearch、字节DeerFlow、MindSearch等前沿平台,实现超越传统关键词匹配的智能信息检索革命
|
人工智能 网络协议 Java
RuoYi AI:1人搞定AI中台!开源全栈式AI开发平台,快速集成大模型+RAG+支付等模块
RuoYi AI 是一个全栈式 AI 开发平台,支持本地 RAG 方案,集成多种大语言模型和多媒体功能,适合企业和个人开发者快速搭建个性化 AI 应用。
2575 77
RuoYi AI:1人搞定AI中台!开源全栈式AI开发平台,快速集成大模型+RAG+支付等模块
|
8月前
|
供应链 监控 搜索推荐
35页PPT|零售行业自助数据分析方法论:指标体系构建平台集成、会员与商品精细化运营实践
在零售行业环境剧变的背景下,传统“人找货”模式正被“货找人”取代。消费者需求日益个性化,购买路径多元化,企业亟需构建统一的指标体系,借助BI平台实现数据驱动的精细化运营。本文从指标体系构建、平台集成到会员与商品运营实践,系统梳理零售经营分析的方法论,助力企业实现敏捷决策与业务闭环。
35页PPT|零售行业自助数据分析方法论:指标体系构建平台集成、会员与商品精细化运营实践
|
9月前
|
机器学习/深度学习 人工智能 监控
CI/CD与模型监控平台集成MLOps系统实现的全面路径
MLOps是机器学习模型在生产环境中持续优化、部署和维护的关键。通过CI/CD流水线和模型监控平台的结合,可以大大提高模型开发和运维的效率,实现高效、稳定的模型服务。随着AI技术的快速发展,MLOps将在企业级AI应用中发挥越来越重要的作用。
CI/CD与模型监控平台集成MLOps系统实现的全面路径
|
9月前
|
人工智能 JavaScript 安全
一文教你高效集成Qwen Code与ModelGate千万免费Toknn模型网关平台
本文详解如何高效集成Qwen Code与ModelGate模型网关平台,涵盖环境搭建、API配置、代码生成等关键步骤,助你实现智能编程与多模型管理,大幅提升AI开发效率。
|
12月前
|
存储 人工智能 监控
通过Milvus和Langchain快速构建基于百炼大模型的LLM问答系统
阿里云向量检索服务Milvus版是一款全托管向量检索引擎,并确保与开源Milvus的完全兼容性,支持无缝迁移。它在开源版本的基础上增强了可扩展性,能提供大规模AI向量数据的相似性检索服务。凭借其开箱即用的特性、灵活的扩展能力和全链路监控告警,Milvus云服务成为多样化AI应用场景的理想选择,包括多模态搜索、检索增强生成(RAG)、搜索推荐、内容风险识别等。您还可以利用开源的Attu工具进行可视化操作,进一步促进应用的快速开发和部署。
1319 4
下一篇
开通oss服务