【AI大模型应用开发】【LangChain系列】9. 实用技巧:大模型的流式输出在 OpenAI 和 LangChain 中的使用

本文涉及的产品
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
简介: 【AI大模型应用开发】【LangChain系列】9. 实用技巧:大模型的流式输出在 OpenAI 和 LangChain 中的使用
  • 大家好,我是同学小张,日常分享AI知识和实战案例
  • 欢迎 点赞 + 关注 👏,持续学习持续干货输出
  • +v: jasper_8017 一起交流💬,一起进步💪。
  • 微信公众号也可搜【同学小张】 🙏

本站文章一览:


当大模型的返回文字非常多时,返回完整的结果会耗费比较长的时间。如果等待大模型形成完整的答案再展示给用户,明显会给用户不好的体验。所以,现在市面上大多数的AI应用,在给用户结果时,都是以流式输出的方式展示给用户的。所谓的流式输出,就是类似打字机式的方式,一个字或一个词的输出,给用户一种答案逐渐出现的动画效果。

今天我们来学习下如何流式输出大模型的返回结果。本文将涵盖 LangChain 的流式输出方式和 OpenAI 原生的流式输出方式。

0. LangChain的流式输出 Streaming

0.1 实现流式输出

我们在 【AI大模型应用开发】【LangChain系列】实战案例4:再战RAG问答,提取在线网页数据,并返回生成答案的来源 代码的基础上,增加流式输出。

原代码:

import bs4
from langchain import hub
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
# Load, chunk and index the contents of the blog.
loader = WebBaseLoader(
    web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
    bs_kwargs=dict(
        parse_only=bs4.SoupStrainer(
            class_=("post-content", "post-title", "post-header")
        )
    ),
)
docs = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings())
# Retrieve and generate using the relevant snippets of the blog.
retriever = vectorstore.as_retriever()
prompt = hub.pull("rlm/rag-prompt")
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)
from langchain_core.runnables import RunnableParallel
rag_chain_from_docs = (
    RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"])))
    | prompt
    | llm
    | StrOutputParser()
)
rag_chain_with_source = RunnableParallel(
    {"context": retriever, "question": RunnablePassthrough()}
).assign(answer=rag_chain_from_docs)
result = rag_chain_with_source.invoke("What is Task Decomposition")
print(result)

修改为流式输出:

# result = rag_chain_with_source.invoke("What is Task Decomposition")
# print(result)
for chunk in rag_chain_with_source.stream("What is Task Decomposition"):
    print(chunk)

修改方式很简单,LangChain的Chain中已经帮我们封装好了 stream 接口,调用该接口获取的结果即为流式输出的结果。其输出的结果如下(每次输出一个词,词前面加一个Key,用来标识这是答案的哪一部分):

我们可以利用Key来组装答案:

output = {}
curr_key = None
for chunk in rag_chain_with_source.stream("What is Task Decomposition"):
    for key in chunk:
        if key not in output:
            output[key] = chunk[key]
        else:
            output[key] += chunk[key]
        if key != curr_key:
            print(f"\n\n{key}: {chunk[key]}", end="", flush=True)
        else:
            print(chunk[key], end="", flush=True)
        curr_key = key

这样我们看到的答案的打印过程就是一个词一个词的出现了。最后展示完跟非流式输出一样。

1. OpenAI 原生的流式输出

1.1 启动 OpenAI 的流式输出

只需要在OpenAI接口调用时,将stream参数置为True,就启用了流式输出。

response = client.chat.completions.create(
    model = model,
    messages = messages,
    temperature = temperature,
    stream=True,    # 启动流式输出
)

1.2 流式输出结果组装

结果的组装过程如下,流式输出的结果在 msg.choices[0].delta 中存着:

text = ""
print("====Streaming====")
# 需要把 stream 里的 token 拼起来,才能得到完整的 call
for msg in response:
    delta = msg.choices[0].delta
    if delta.content:
        text_delta = delta.content
        print(text_delta)
        text = text + text_delta
print("====done!====")
if text:
    print(text)

1.3 完整的流式输出测试程序

from openai import OpenAI
# 加载 .env 到环境变量
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())
client = OpenAI()
###### 这里封装成函数 #######
def get_openai_chat_completion(messages, temperature, model = "gpt-3.5-turbo-1106"):
    response = client.chat.completions.create(
        model = model,
        messages = messages,
        temperature = temperature,
        stream=True,    # 启动流式输出
    )
    return response
SYSTEM_PROMPT = """
你是一名资深教师,你叫“同学小张”,用户会给你一个提示,你根据用户给的提示,来为用户设计关于此课程的学习大纲。
你必须遵循以下原则:
1. 你有足够的时间思考,确保在得出答案之前,你已经足够理解用户需求中的所有关键概念,并给出关键概念的解释。
2. 输出格式请使用Markdown格式, 并保证输出内容清晰易懂。
3. 至少输出10章的内容, 每章至少有5个小节
不要回答任何与课程内容无关的问题。
"""
if __name__ == "__main__":
    user_input = "大模型应用开发"
    
    messages = [
        {
            "role": "system",
            "content": SYSTEM_PROMPT,
        },
        {
            "role": "user",
            "content": user_input,
        }   
    ]
    response = get_openai_chat_completion(messages, 0.5)
    
    text = ""
    print("====Streaming====")
    # 需要把 stream 里的 token 拼起来,才能得到完整的 call
    for msg in response:
        delta = msg.choices[0].delta
        if delta.content:
            text_delta = delta.content
            print(text_delta)
            text = text + text_delta
    print("====done!====")
    if text:
        print(text)

流式输出过程如下:

组装后结果如下:

如果觉得本文对你有帮助,麻烦点个赞和关注呗 ~~~


  • 大家好,我是 同学小张,日常分享AI知识和实战案例
  • 欢迎 点赞 + 关注 👏,持续学习持续干货输出
  • +v: jasper_8017 一起交流💬,一起进步💪。
  • 微信公众号也可搜【同学小张】 🙏

本站文章一览:

相关实践学习
阿里云百炼xAnalyticDB PostgreSQL构建AIGC应用
通过该实验体验在阿里云百炼中构建企业专属知识库构建及应用全流程。同时体验使用ADB-PG向量检索引擎提供专属安全存储,保障企业数据隐私安全。
AnalyticDB PostgreSQL 企业智能数据中台:一站式管理数据服务资产
企业在数据仓库之上可构建丰富的数据服务用以支持数据应用及业务场景;ADB PG推出全新企业智能数据平台,用以帮助用户一站式的管理企业数据服务资产,包括创建, 管理,探索, 监控等; 助力企业在现有平台之上快速构建起数据服务资产体系
相关文章
|
8天前
|
存储 人工智能
|
14天前
|
人工智能 安全 机器人
OpenAI发布Model Spec,揭示其期望AI如何行动
OpenAI发布Model Spec,揭示其期望AI如何行动
OpenAI发布Model Spec,揭示其期望AI如何行动
|
19天前
|
人工智能 JSON API
|
20天前
|
人工智能 自然语言处理 前端开发
|
7天前
|
SQL 人工智能 SEO
|
16天前
|
JSON Go 网络架构
langchain 入门指南 - 自动选择不同的大模型
langchain 入门指南 - 自动选择不同的大模型
14 0
|
16天前
|
JSON 数据格式
langchain 入门指南 - JSON 形式输出大模型的响应
langchain 入门指南 - JSON 形式输出大模型的响应
32 0
|
16天前
|
API
langchain 入门指南(二)- 如何跟大模型对话
langchain 入门指南(二)- 如何跟大模型对话
17 0
|
机器学习/深度学习 人工智能 测试技术
|
2天前
|
机器学习/深度学习 传感器 人工智能
AI技术在医疗领域的应用与挑战
【8月更文挑战第29天】人工智能(AI)技术在医疗领域的应用日益广泛,为患者提供更精准、高效的医疗服务。本文将探讨AI技术在医疗领域的应用及其面临的挑战,包括诊断辅助、药物研发、患者管理和远程监测等方面。我们将通过实际案例和数据来展示AI技术在医疗领域的优势,并讨论其未来发展的可能性和潜在问题。

热门文章

最新文章

下一篇
云函数