- 大家好,我是同学小张,日常分享AI知识和实战案例
- 欢迎 点赞 + 关注 👏,持续学习,持续干货输出。
- +v: jasper_8017 一起交流💬,一起进步💪。
- 微信公众号也可搜【同学小张】 🙏
本站文章一览:
当大模型的返回文字非常多时,返回完整的结果会耗费比较长的时间。如果等待大模型形成完整的答案再展示给用户,明显会给用户不好的体验。所以,现在市面上大多数的AI应用,在给用户结果时,都是以流式输出的方式展示给用户的。所谓的流式输出,就是类似打字机式的方式,一个字或一个词的输出,给用户一种答案逐渐出现的动画效果。
今天我们来学习下如何流式输出大模型的返回结果。本文将涵盖 LangChain 的流式输出方式和 OpenAI 原生的流式输出方式。
0. LangChain的流式输出 Streaming
0.1 实现流式输出
我们在 【AI大模型应用开发】【LangChain系列】实战案例4:再战RAG问答,提取在线网页数据,并返回生成答案的来源 代码的基础上,增加流式输出。
原代码:
import bs4 from langchain import hub from langchain_community.document_loaders import WebBaseLoader from langchain_community.vectorstores import Chroma from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough from langchain_openai import ChatOpenAI, OpenAIEmbeddings from langchain_text_splitters import RecursiveCharacterTextSplitter # Load, chunk and index the contents of the blog. loader = WebBaseLoader( web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",), bs_kwargs=dict( parse_only=bs4.SoupStrainer( class_=("post-content", "post-title", "post-header") ) ), ) docs = loader.load() text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) splits = text_splitter.split_documents(docs) vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings()) # Retrieve and generate using the relevant snippets of the blog. retriever = vectorstore.as_retriever() prompt = hub.pull("rlm/rag-prompt") llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0) def format_docs(docs): return "\n\n".join(doc.page_content for doc in docs) from langchain_core.runnables import RunnableParallel rag_chain_from_docs = ( RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"]))) | prompt | llm | StrOutputParser() ) rag_chain_with_source = RunnableParallel( {"context": retriever, "question": RunnablePassthrough()} ).assign(answer=rag_chain_from_docs) result = rag_chain_with_source.invoke("What is Task Decomposition") print(result)
修改为流式输出:
# result = rag_chain_with_source.invoke("What is Task Decomposition") # print(result) for chunk in rag_chain_with_source.stream("What is Task Decomposition"): print(chunk)
修改方式很简单,LangChain的Chain中已经帮我们封装好了 stream 接口,调用该接口获取的结果即为流式输出的结果。其输出的结果如下(每次输出一个词,词前面加一个Key,用来标识这是答案的哪一部分):
我们可以利用Key来组装答案:
output = {} curr_key = None for chunk in rag_chain_with_source.stream("What is Task Decomposition"): for key in chunk: if key not in output: output[key] = chunk[key] else: output[key] += chunk[key] if key != curr_key: print(f"\n\n{key}: {chunk[key]}", end="", flush=True) else: print(chunk[key], end="", flush=True) curr_key = key
这样我们看到的答案的打印过程就是一个词一个词的出现了。最后展示完跟非流式输出一样。
1. OpenAI 原生的流式输出
1.1 启动 OpenAI 的流式输出
只需要在OpenAI接口调用时,将stream参数置为True,就启用了流式输出。
response = client.chat.completions.create( model = model, messages = messages, temperature = temperature, stream=True, # 启动流式输出 )
1.2 流式输出结果组装
结果的组装过程如下,流式输出的结果在 msg.choices[0].delta
中存着:
text = "" print("====Streaming====") # 需要把 stream 里的 token 拼起来,才能得到完整的 call for msg in response: delta = msg.choices[0].delta if delta.content: text_delta = delta.content print(text_delta) text = text + text_delta print("====done!====") if text: print(text)
1.3 完整的流式输出测试程序
from openai import OpenAI # 加载 .env 到环境变量 from dotenv import load_dotenv, find_dotenv _ = load_dotenv(find_dotenv()) client = OpenAI() ###### 这里封装成函数 ####### def get_openai_chat_completion(messages, temperature, model = "gpt-3.5-turbo-1106"): response = client.chat.completions.create( model = model, messages = messages, temperature = temperature, stream=True, # 启动流式输出 ) return response SYSTEM_PROMPT = """ 你是一名资深教师,你叫“同学小张”,用户会给你一个提示,你根据用户给的提示,来为用户设计关于此课程的学习大纲。 你必须遵循以下原则: 1. 你有足够的时间思考,确保在得出答案之前,你已经足够理解用户需求中的所有关键概念,并给出关键概念的解释。 2. 输出格式请使用Markdown格式, 并保证输出内容清晰易懂。 3. 至少输出10章的内容, 每章至少有5个小节 不要回答任何与课程内容无关的问题。 """ if __name__ == "__main__": user_input = "大模型应用开发" messages = [ { "role": "system", "content": SYSTEM_PROMPT, }, { "role": "user", "content": user_input, } ] response = get_openai_chat_completion(messages, 0.5) text = "" print("====Streaming====") # 需要把 stream 里的 token 拼起来,才能得到完整的 call for msg in response: delta = msg.choices[0].delta if delta.content: text_delta = delta.content print(text_delta) text = text + text_delta print("====done!====") if text: print(text)
流式输出过程如下:
组装后结果如下:
如果觉得本文对你有帮助,麻烦点个赞和关注呗 ~~~
- 大家好,我是 同学小张,日常分享AI知识和实战案例
- 欢迎 点赞 + 关注 👏,持续学习,持续干货输出。
- +v: jasper_8017 一起交流💬,一起进步💪。
- 微信公众号也可搜【同学小张】 🙏
本站文章一览: