今天我们来学习下如何流式输出大模型的返回结果。本文将涵盖 LangChain 的流式输出方式和 OpenAI 原生的流式输出方式。
0. LangChain的流式输出 Streaming
0.1 实现流式输出
我们在 【AI大模型应用开发】【LangChain系列】实战案例4:再战RAG问答,提取在线网页数据,并返回生成答案的来源 代码的基础上,增加流式输出。
import bs4 from langchain import hub from langchain_community.document_loaders import WebBaseLoader from langchain_community.vectorstores import Chroma from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough from langchain_openai import ChatOpenAI, OpenAIEmbeddings from langchain_text_splitters import RecursiveCharacterTextSplitter # Load, chunk and index the contents of the blog. loader = WebBaseLoader( web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",), bs_kwargs=dict( parse_only=bs4.SoupStrainer( class_=("post-content", "post-title", "post-header") ) ), ) docs = loader.load() text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) splits = text_splitter.split_documents(docs) vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings()) # Retrieve and generate using the relevant snippets of the blog. retriever = vectorstore.as_retriever() prompt = hub.pull("rlm/rag-prompt") llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0) def format_docs(docs): return "\n\n".join(doc.page_content for doc in docs) from langchain_core.runnables import RunnableParallel rag_chain_from_docs = ( RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"]))) | prompt | llm | StrOutputParser() ) rag_chain_with_source = RunnableParallel( {"context": retriever, "question": RunnablePassthrough()} ).assign(answer=rag_chain_from_docs) result = rag_chain_with_source.invoke("What is Task Decomposition") print(result)
# result = rag_chain_with_source.invoke("What is Task Decomposition") # print(result) for chunk in rag_chain_with_source.stream("What is Task Decomposition"): print(chunk)
修改方式很简单,LangChain的Chain中已经帮我们封装好了 stream 接口,调用该接口获取的结果即为流式输出的结果。其输出的结果如下(每次输出一个词,词前面加一个Key,用来标识这是答案的哪一部分):
output = {} curr_key = None for chunk in rag_chain_with_source.stream("What is Task Decomposition"): for key in chunk: if key not in output: output[key] = chunk[key] else: output[key] += chunk[key] if key != curr_key: print(f"\n\n{key}: {chunk[key]}", end="", flush=True) else: print(chunk[key], end="", flush=True) curr_key = key
1. OpenAI 原生的流式输出
1.1 启动 OpenAI 的流式输出
response = client.chat.completions.create( model = model, messages = messages, temperature = temperature, stream=True, # 启动流式输出 )
1.2 流式输出结果组装
结果的组装过程如下,流式输出的结果在 msg.choices[0].delta
text = "" print("====Streaming====") # 需要把 stream 里的 token 拼起来,才能得到完整的 call for msg in response: delta = msg.choices[0].delta if delta.content: text_delta = delta.content print(text_delta) text = text + text_delta print("====done!====") if text: print(text)
1.3 完整的流式输出测试程序
from openai import OpenAI # 加载 .env 到环境变量 from dotenv import load_dotenv, find_dotenv _ = load_dotenv(find_dotenv()) client = OpenAI() ###### 这里封装成函数 ####### def get_openai_chat_completion(messages, temperature, model = "gpt-3.5-turbo-1106"): response = client.chat.completions.create( model = model, messages = messages, temperature = temperature, stream=True, # 启动流式输出 ) return response SYSTEM_PROMPT = """ 你是一名资深教师,你叫“同学小张”,用户会给你一个提示,你根据用户给的提示,来为用户设计关于此课程的学习大纲。 你必须遵循以下原则: 1. 你有足够的时间思考,确保在得出答案之前,你已经足够理解用户需求中的所有关键概念,并给出关键概念的解释。 2. 输出格式请使用Markdown格式, 并保证输出内容清晰易懂。 3. 至少输出10章的内容, 每章至少有5个小节 不要回答任何与课程内容无关的问题。 """ if __name__ == "__main__": user_input = "大模型应用开发" messages = [ { "role": "system", "content": SYSTEM_PROMPT, }, { "role": "user", "content": user_input, } ] response = get_openai_chat_completion(messages, 0.5) text = "" print("====Streaming====") # 需要把 stream 里的 token 拼起来,才能得到完整的 call for msg in response: delta = msg.choices[0].delta if delta.content: text_delta = delta.content print(text_delta) text = text + text_delta print("====done!====") if text: print(text)
