基于百炼 qwen plus的上市企业ESG图谱构建工作
# 安装包(Python >= 3.7):pip install qianfan
import json
import pandas as pd
from openai import OpenAI
client = OpenAI(
# 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx",
api_key="sk-XXX",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
# print(completion.model_dump_json())
NavigationAdvisory = json.loads(
pd.read_excel("../data/上市公司ESG报告基于非限制性图谱类别性质的大模型生成.xlsx").to_json(orient="records", force_ascii=False))
# print(NavigationAdvisory)
b = []
for NavigationAdvisory in NavigationAdvisory:
for i in NavigationAdvisory['字段1'].split("\n"):
if len(i) > 64:
prompt_message = """### ESG报告的知识图谱抽取指令
knowledge_graph:
entities:
- name: "企业名称"
type: "公司"
attributes:
- {name: "行业", value: "科技"}
- {name: "总部所在地", value: "北京"}
- {name: "市值", value: "500亿人民币"}
- name: "碳排放量"
type: "环境指标"
attributes:
- {name: "范围1", value: "100万吨CO2"}
- {name: "范围2", value: "50万吨CO2"}
- {name: "范围3", value: "30万吨CO2"}
- name: "员工福利计划"
type: "社会指标"
attributes:
- {name: "福利种类", value: "健康保险、培训津贴"}
- {name: "覆盖率", value: "90%"}
- name: "董事会多样性"
type: "治理指标"
attributes:
- {name: "女性董事比例", value: "30%"}
- {name: "独立董事比例", value: "50%"}
relationships:
- {from: "企业名称", to: "碳排放量", relationship: "环境绩效指标"}
- {from: "企业名称", to: "员工福利计划", relationship: "社会绩效指标"}
- {from: "企业名称", to: "董事会多样性", relationship: "治理绩效指标"}
### 任务指令优化
#### **目标:**
- **全面性**:确保知识图谱涵盖ESG报告的核心内容,包括环境、社会、治理三个维度。
- **动态性**:能反映企业ESG指标的时间变化和对不同指标的改进。
- **结构清晰**:优化YAML结构,确保涵盖属性信息,清晰展示指标与企业的关联。
#### **具体优化:**
1. 抽取对象:ESG报告中的企业信息、环境指标(碳排放量、资源使用)、社会指标(员工权益、供应链管理)、治理指标(董事会多样性、反腐败政策)。
2. 属性:每个实体包含关键属性,确保涵盖指标的量化信息及描述性细节。
3. 关系:明确各类指标与企业之间的关联关系。
---
#### **指令模板:**
1. **输入内容**:
提供上市公司ESG报告文本,包括关键指标说明(如碳排放量、员工培训计划、治理实践等)。
2. **输出内容**:
按以下结构提取内容并输出:
```yaml
knowledge_graph:
entities:
- name: "实体名称"
type: "实体类型"
attributes:
- {name: "属性名称", value: "属性值"}
relationships:
- {from: "实体1名称", to: "实体2名称", relationship: "关系描述"}
3. **基于以下内容构建ESG知识图谱**:
示例文本:
公司A总部位于上海,行业为消费品,2023年碳排放量范围1为50万吨CO2,范围2为30万吨CO2。董事会成员中,女性比例为40%,独立董事比例为60%。公司为员工提供全面的健康保险,覆盖率为95%。此外,公司在供应链管理中执行严格的环保标准。
**输出:**
knowledge_graph:
entities:
- name: "公司A"
type: "公司"
attributes:
- {name: "行业", value: "消费品"}
- {name: "总部所在地", value: "上海"}
- name: "碳排放量"
type: "环境指标"
attributes:
- {name: "范围1", value: "50万吨CO2"}
- {name: "范围2", value: "30万吨CO2"}
- name: "董事会多样性"
type: "治理指标"
attributes:
- {name: "女性董事比例", value: "40%"}
- {name: "独立董事比例", value: "60%"}
- name: "员工福利计划"
type: "社会指标"
attributes:
- {name: "福利种类", value: "健康保险"}
- {name: "覆盖率", value: "95%"}
- name: "供应链管理"
type: "治理指标"
attributes:
- {name: "环保标准", value: "严格"}
relationships:
- {from: "公司A", to: "碳排放量", relationship: "环境绩效指标"}
- {from: "公司A", to: "董事会多样性", relationship: "治理绩效指标"}
- {from: "公司A", to: "员工福利计划", relationship: "社会绩效指标"}
- {from: "公司A", to: "供应链管理", relationship: "治理实践"}
""" + i
completion = client.chat.completions.create(
model="qwen-plus", # 模型列表:https://help.aliyun.com/zh/model-studio/getting-started/models
messages=[
{
"role": "user",
"content": prompt_message
},
{
"role": "user",
"content": "未知的信息不要给我返回 以yaml结构返回结果"}
],
)
import json
resp_body_result = json.loads(completion.model_dump_json())["choices"][0]["message"]['content']
open("上市公司ESG报告基于非限制性图谱类别性质的大模型生成_yaml_builder_2024年11月22日.md", "a", encoding='utf-8').write(i + "\n")
open("上市公司ESG报告基于非限制性图谱类别性质的大模型生成_yaml_builder_2024年11月22日.md", "a", encoding='utf-8').write(
resp_body_result + "\n")
b.append({
"input": i, "output": resp_body_result, "prompt": str(prompt_message)})
pd.DataFrame(b).to_excel("上市公司ESG报告基于非限制性图谱类别性质的大模型生成_yaml_builder_2024年11月22日.xlsx", index=False)
代码功能解读
主要功能:
这段代码用于通过调用阿里云的 OpenAI
服务生成上市公司 ESG 报告的知识图谱。输入是一个包含 ESG 报告的 Excel 文件,代码逐条处理每条记录,构建适用于 ESG 报告的知识图谱,并以 YAML 格式输出。
代码分步解读:
必要库导入:
import json import pandas as pd from openai import OpenAI
json
: 处理 JSON 数据。pandas
: 用于读取和处理 Excel 数据。OpenAI
: 调用大模型的 API。
API 客户端初始化:
client = OpenAI( api_key="sk-XXX", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", )
- 初始化 OpenAI 客户端,配置访问阿里云大模型服务的
API Key
和base_url
。
- 初始化 OpenAI 客户端,配置访问阿里云大模型服务的
加载数据:
NavigationAdvisory = json.loads( pd.read_excel("../data/上市公司ESG报告基于非限制性图谱类别性质的大模型生成.xlsx").to_json(orient="records", force_ascii=False))
- 读取包含 ESG 报告的 Excel 文件,将其转换为 JSON 格式的记录列表。
- 每条记录表示一段文本或相关数据。
逐条处理数据:
for NavigationAdvisory in NavigationAdvisory: for i in NavigationAdvisory['字段1'].split("\n"):
- 遍历每条记录中的
字段1
(可能包含多行内容),并逐行处理。
- 遍历每条记录中的
生成 Prompt:
- 若内容长度超过 64 个字符,将其拼接到预定义的 Prompt 模板中。
prompt_message = """### ESG报告的知识图谱抽取指令优化 ...(模板内容省略) """ + i
- 具体包括 YAML 知识图谱结构和操作说明,用以指导模型生成输出。
- 若内容长度超过 64 个字符,将其拼接到预定义的 Prompt 模板中。
调用大模型 API:
completion = client.chat.completions.create( model="qwen-plus", messages=[ { "role": "user", "content": prompt_message}, { "role": "user", "content": "未知的信息不要给我返回 以yaml结构返回结果"} ], )
- 使用
qwen-plus
模型,根据 Prompt 生成对应的 YAML 知识图谱。
- 使用
处理和保存响应:
resp_body_result = json.loads(completion.model_dump_json())["choices"][0]["message"]['content'] open("..._yaml_builder_2024年11月22日.md", "a", encoding='utf-8').write(i + "\n") open("..._yaml_builder_2024年11月22日.md", "a", encoding='utf-8').write(resp_body_result + "\n")
- 将输入内容和生成的 YAML 输出追加保存到 Markdown 文件中。
存储数据记录:
b.append({ "input": i, "output": resp_body_result, "prompt": str(prompt_message)}) pd.DataFrame(b).to_excel("..._yaml_builder_2024年11月22日.xlsx", index=False)
- 保存输入内容、生成的 YAML 输出以及对应的 Prompt 记录到 Excel 文件中。
功能特点:
数据输入:
- 从 Excel 文件中读取 ESG 报告的内容,逐条进行处理。
任务抽取:
- 生成包含公司、环境、社会和治理等 ESG 指标的知识图谱结构,确保 YAML 格式清晰。
输出保存:
- 输出以 Markdown 和 Excel 文件保存,便于后续分析和使用。
优化建议:
批量处理效率:
- 使用
batch
模式将多个Prompt
一次性传递给大模型,减少 API 调用次数。
- 使用
错误处理:
- 对可能的 API 请求错误或数据格式问题增加
try-except
。
- 对可能的 API 请求错误或数据格式问题增加
配置解耦:
- 将
api_key
和文件路径等配置项提取到配置文件,提升代码可移植性。
- 将
动态模型选择:
- 根据任务复杂度动态调整使用的模型(如
qwen-plus
或其他更适配的模型)。
- 根据任务复杂度动态调整使用的模型(如
通过魔搭社区下载qwen2.5 7B Instruct模型
#模型下载
from modelscope import snapshot_download
model_dir = snapshot_download('Qwen/Qwen2.5-7B-Instruct')
基于vllm部署qwen2.5 7B Instruct 模型
vllm serve .cache/modelscope/hub/qwen/Qwen2___5-7B-Instruct --served-model-name Qwen2___5-7B-Instruct --enable-auto-tool-choice --tool-call-parser hermes --max-model-len=32768 --tensor-parallel-size 2 --port 8000
通过多轮对话能力重新生成非标准yaml,提升转换提升率
import re
import yaml
# YAML 内容(从字符串加载)
import json
import requests
# 定义一个远程任务函数
def process_file(data):
print(data)
headers = {
"Content-Type": "application/json"
}
response = requests.post("http://local:8000/v1/chat/completions", headers=headers,
data=json.dumps(data))
if response.status_code == 200:
data = response.json()
content = data['choices'][0]['message']['content']
return content
else:
return f"请求失败,状态码:{response.status_code}"
# process_file()
# 数据库格式转换函数
def yaml_to_kg(data):
knowledge_graph = {
"entities": [], "relations": [], "events": [], "timeline": []}
# 处理实体
entities = data.get("military_knowledge_graph", {
}).get("entities", [])
for entity in entities:
knowledge_graph["entities"].append({
"name": entity["name"],
"type": entity["type"],
"attributes": entity.get("attributes", {
})
})
# 处理关系
relations = data.get("military_knowledge_graph", {
}).get("relations", [])
for relation in relations:
knowledge_graph["relations"].append({
"subject": relation["subject"],
"predicate": relation["predicate"],
"object": relation["object"]
})
# 处理事件
events = data.get("military_event_graph", {
}).get("events", [])
for event in events:
knowledge_graph["events"].append({
"name": event["name"],
"type": event["type"],
"participants": event.get("participants", []),
"location": event.get("location", ""),
"description": event.get("description", ""),
"date": event.get("date", "未知")
})
# 处理时间线
timeline = data.get("military_temporal_graph", {
}).get("timeline", [])
for time_event in timeline:
knowledge_graph["timeline"].append({
"event": time_event["event"],
"timestamp": time_event["timestamp"],
"relation_to_next_event": time_event.get("relation_to_next_event", ""),
"next_event": time_event.get("next_event", "")
})
return knowledge_graph
import pandas as pd
def str_yaml(text):
# 修正正则表达式以匹配完整的YAML内容,并确保处理可能的空列表
knowledge_graph_content = re.findall(r"(military_knowledge_graph:.*?)(?=military_event_graph:)", text, re.DOTALL)
event_graph_content = re.findall(r"(military_event_graph:.*?)(?=military_temporal_graph:)", text, re.DOTALL)
temporal_graph_content = re.findall(r"(military_temporal_graph:.*?)(?=\n\n)", text, re.DOTALL)
# 将提取的内容合并
yaml_content = (knowledge_graph_content[0] if knowledge_graph_content else "") + \
(event_graph_content[0] if event_graph_content else "") + \
(temporal_graph_content[0] if temporal_graph_content else "")
# yaml_content if yaml_content else "No YAML content found"
return yaml_content
export = []
yaml_better = pd.read_excel("./上市公司ESG报告大模型抽取非限定schema图谱.xlsx").values.tolist()
for yaml_better in yaml_better:
# 加载 YAML 数据
try:
if "```yaml" in yaml_better[1]:
yaml_content = str_yaml(yaml_better[1])
else:
yaml_content = yaml_better[1]
data = yaml.safe_load(yaml_content)
# print(data)
if data:
export.append(data)
except Exception as e:
print(e)
print(yaml_better[1])
data = {
'model': 'Qwen2___5-7B-Instruct', 'messages':
[
{
'role': 'system', 'content': '''异常解析结果:''' + str(e)},
{
'role': 'system', 'content': yaml_better[1]},
{
'role': 'system', 'content': '根据异常重新整理yaml结构并返回'}
]
}
data = process_file(data)
yaml_content = str_yaml(data)
try:
data = yaml.safe_load(yaml_content)
except:
data =None
continue
# print(data)
if data:
export.append(data)
continue
pd.DataFrame(export).to_excel("上市公司ESG_knowledge.xlsx")
以下是对上述Python代码的详细解读:
1. 导入必要的库
import re
import yaml
import json
import requests
import pandas as pd
这段代码导入了五个常用的Python库:
re
:Python的标准正则表达式库,用于处理文本匹配和搜索等操作,在这里用于从文本中提取符合特定模式的内容(如提取YAML格式相关部分)。yaml
:用于解析和处理YAML格式数据的库,方便将YAML格式的字符串转换为Python数据结构(如字典、列表等),以及反向操作。json
:用于处理JSON格式数据的库,在和网络接口交互(发送和接收数据)时,常需要将Python对象转换为JSON字符串(通过json.dumps
)以及将接收到的JSON格式响应转换为Python对象(通过response.json
)。requests
:一个简洁易用的HTTP库,用于发送HTTP请求(如代码中的POST
请求)到指定的服务器地址,获取响应数据等,方便和网络服务进行交互。pandas
:强大的数据处理和分析库,在这里主要用于读取和写入Excel文件,便于数据的批量处理和存储。
2. 远程任务函数 process_file
def process_file(data):
print(data)
headers = {
"Content-Type": "application/json"
}
response = requests.post("http://local:8000/v1/chat/completions", headers=headers,
data=json.dumps(data))
if response.status_code == 200:
data = response.json()
content = data['choices'][0]['message']['content']
return content
else:
return f"请求失败,状态码:{response.status_code}"
这个函数用于向本地的http://local:8000/v1/chat/completions
接口发送一个POST
请求,并期望得到JSON格式的响应数据。具体功能如下:
- 首先,定义了请求头
headers
,设置Content-Type
为application/json
,表明发送的数据将是JSON格式。 - 然后,使用
requests.post
方法发送请求,将传入的data
参数(应该是一个Python对象,会先通过json.dumps
转换为JSON字符串)发送给指定接口。 - 如果响应的状态码是
200
(表示请求成功),则使用response.json
方法将响应内容(JSON格式)转换为Python对象,并从中提取出聊天回复的内容(从响应结构中按特定键值路径获取,即data['choices'][0]['message']['content']
)返回。 - 如果响应状态码不是
200
,则返回一个包含请求失败状态码信息的字符串,告知调用者请求出现问题。
3. 数据库格式转换函数 yaml_to_kg
def yaml_to_kg(data):
knowledge_graph = {
"entities": [], "relations": [], "events": [], "timeline": []}
# 处理实体
entities = data.get("military_knowledge_graph", {
}).get("entities", [])
for entity in entities:
knowledge_graph["entities"].append({
"name": entity["name"],
"type": entity["type"],
"attributes": entity.get("attributes", {
})
})
# 处理关系
relations = data.get("military_knowledge_graph", {
}).get("relations", [])
for relation in relations:
knowledge_graph["relations"].append({
"subject": relation["subject"],
"predicate": relation["predicate"],
"object": relation["object"]
})
# 处理事件
events = data.get("military_event_graph", {
}).get("events", [])
for event in events:
knowledge_graph["events"].append({
"name": event["name"],
"type": event["type"],
"participants": event.get("participants", []),
"location": event.get("location", ""),
"description": event.get("description", ""),
"date": event.get("date", "未知")
})
# 处理时间线
timeline = data.get("military_temporal_graph", {
}).get("timeline", [])
for time_event in timeline:
knowledge_graph["timeline"].append({
"event": time_event["event"],
"timestamp": time_event["timestamp"],
"relation_to_next_event": time_event.get("relation_to_next_event", ""),
"next_event": time_event.get("next_event", "")
})
return knowledge_graph
这个函数的主要作用是将特定格式的数据(可能从YAML解析后得到的数据结构)转换为一种统一的知识图谱格式的Python字典结构。具体步骤如下:
- 首先初始化一个名为
knowledge_graph
的空字典,包含了"entities"
、"relations"
、"events"
和"timeline"
四个空列表,用于后续填充相应的信息。 - 处理实体部分:从输入的
data
中获取"military_knowledge_graph"
字典下的"entities"
列表(如果不存在则默认为空列表),然后遍历每个实体字典,提取"name"
、"type"
以及"attributes"
(如果不存在默认为空字典)信息,构建新的实体字典格式并添加到knowledge_graph["entities"]
列表中。 - 处理关系部分:类似地,从
data
中获取"military_knowledge_graph"
下的"relations"
列表,遍历每个关系字典,提取"subject"
、"predicate"
、"object"
信息构建新的关系字典,添加到knowledge_graph["relations"]
列表中。 - 处理事件部分:从
data
中获取"military_event_graph"
下的"events"
列表,对于每个事件字典,提取"name"
、"type"
、"participants"
(不存在则默认为空列表)、"location"
(不存在则为空字符串)、"description"
(不存在则为空字符串)以及"date"
(不存在则设为"未知"
)信息构建新的事件字典,添加到knowledge_graph["events"]
列表中。 - 处理时间线部分:从
data
中获取"military_temporal_graph"
下的"timeline"
列表,遍历每个时间线相关的字典,提取"event"
、"timestamp"
、"relation_to_next_event"
(不存在则为空字符串)、"next_event"
(不存在则为空字符串)信息构建新的时间线字典,添加到knowledge_graph["timeline"]
列表中。 - 最后返回构建好的
knowledge_graph
字典,它代表了整理后的知识图谱结构数据。
4. 处理YAML文本提取内容的函数 str_yaml
def str_yaml(text):
# 修正正则表达式以匹配完整的YAML内容,并确保处理可能的空列表
knowledge_graph_content = re.findall(r"(military_knowledge_graph:.*?)(?=military_event_graph:)", text, re.DOTALL)
event_graph_content = re.findall(r"(military_event_graph:.*?)(?=military_temporal_graph:)", text, re.DOTALL)
temporal_graph_content = re.findall(r"(military_temporal_graph:.*?)(?=\n\n)", text, re.DOTALL)
# 将提取的内容合并
yaml_content = (knowledge_graph_content[0] if knowledge_graph_content else "") + \
(event_graph_content[0] if event_graph_content else "") + \
(temporal_graph_content[0] if temporal_graph_content else "")
return yaml_content
这个函数用于从给定的文本中提取特定YAML格式相关的内容,主要通过正则表达式来匹配不同部分的YAML内容,具体如下:
- 使用
re.findall
函数结合不同的正则表达式模式来查找文本中符合条件的内容。- 第一个正则表达式
r"(military_knowledge_graph:.*?)(?=military_event_graph:)"
(使用了非贪婪匹配.*?
和正向预查(?=...)
)用于查找从military_knowledge_graph:
开始到下一个military_event_graph:
之前的内容,这部分内容可能是知识图谱相关的YAML部分。 - 第二个正则表达式
r"(military_event_graph:.*?)(?=military_temporal_graph:)"
类似,用于查找事件图谱相关的YAML部分,即从military_event_graph:
开始到military_temporal_graph:
之前的内容。 - 第三个正则表达式
r"(military_temporal_graph:.*?)(?=\n\n)"
用于查找时间线相关的YAML部分,从military_temporal_graph:
开始到连续两个换行符之前的内容。
- 第一个正则表达式
- 然后将提取到的这三部分内容(如果提取到了,否则为空字符串)按顺序合并起来,作为最终提取的YAML内容返回。
5. 主程序部分
export = []
yaml_better = pd.read_excel("./上市公司ESG报告大模型抽取非限定schema图谱.xlsx").values.tolist()
for yaml_better in yaml_better:
# 加载 YAML 数据
try:
if "```yaml" in yaml_better[1]:
yaml_content = str_yaml(yaml_better[1])
else:
yaml_content = yaml_better[1]
data = yaml.safe_load(yaml_content)
if data:
export.append(data)
except Exception as e:
print(e)
print(yaml_better[1])
data = {
'model': 'Qwen2___5-7B-Instruct', 'messages':
[
{
'role': 'system', 'content': '''异常解析结果:''' + str(e)},
{
'role': 'system', 'content': yaml_better[1]},
{
'role': 'system', 'content': '根据异常重新整理yaml结构并返回'}
]
}
data = process_file(data)
yaml_content = str_yaml(data)
try:
data = yaml.safe_load(yaml_content)
except:
data = None
continue
if data:
export.append(data)
continue
pd.DataFrame(export).to_excel("上市公司ESG_knowledge.xlsx")
这部分是代码的主逻辑,主要执行以下操作:
- 首先初始化一个空列表
export
,用于存储最终要保存的数据。然后使用pandas
的read_excel
函数读取"./上市公司ESG报告大模型抽取非限定schema图谱.xlsx"
文件内容,并转换为列表形式(每个元素对应Excel中的一行数据)存储在yaml_better
变量中。 - 接着遍历
yaml_better
列表中的每一项,尝试加载YAML数据:- 如果该项的第二个元素(索引为1)中包含
"```yaml"
,说明可能是代码格式标记包裹的YAML内容,就调用str_yaml
函数提取出实际的YAML内容;否则直接将该项的第二个元素作为YAML内容。 - 然后使用
yaml.safe_load
函数尝试将提取的YAML内容转换为Python对象(data
),如果转换成功且data
不为空,则将其添加到export
列表中。
- 如果该项的第二个元素(索引为1)中包含
- 如果在上述
yaml.safe_load
过程中出现异常(except
块捕获),则执行以下操作:- 先打印异常信息和对应的原始文本内容(
yaml_better[1]
)。 - 构建一个包含异常信息以及原始文本等内容的字典
data
,将其作为参数传递给process_file
函数,尝试从远程接口获取重新整理后的内容。 - 得到返回内容后,再次调用
str_yaml
函数提取其中的YAML内容,然后尝试使用yaml.safe_load
将其转换为Python对象,如果转换成功且不为空,就添加到export
列表中,否则跳过继续下一轮循环。
- 先打印异常信息和对应的原始文本内容(
- 最后,将
export
列表转换为pandas
的DataFrame
对象,并保存为名为"上市公司ESG_knowledge.xlsx"
的Excel文件,实现了对读取的原始数据进行处理、可能的异常修复以及最终的数据保存操作。
总体来说,这段代码的功能大致是从一个Excel文件中读取可能包含YAML格式数据的内容,经过提取、解析、可能的远程协助修复异常等处理后,转换为特定知识图谱格式的数据结构,并最终保存到另一个Excel文件中,过程中涉及了和网络接口的交互以及多种文本处理和数据格式转换操作。不过代码中使用正则表达式提取YAML部分可能存在一定局限性,比如YAML格式稍微变化可能导致匹配不准确,而且在和远程接口交互部分如果接口不稳定等情况可能影响整体程序运行效果等问题。
基于转换好的图谱构建可视化
import networkx as nx
from pyvis.network import Network
# 初始化知识图谱
G = nx.DiGraph()
# 定义函数用于添加节点和边
def add_entities_and_relationships(graph, entities, relationships):
# 添加实体
for entity in entities:
graph.add_node(entity['name'], **entity)
# print(relationships)
# 添加关系
for relation in relationships:
source = ""
target = ""
if "subject" in relation:
source = relation.get('source', relation.get('subject'))
if "from" in relation:
source = relation.get('source', relation.get('from'))
if "source_event" in relation:
source = relation.get('source', relation.get('source_event'))
if "object" in relation:
target = relation.get('target', relation.get('object'))
if "target_event" in relation:
target = relation.get('target', relation.get('target_event'))
if "to" in relation:
target = relation.get('target', relation.get('to'))
predicate = ""
# print(relation)
if "relationship" in relation:
predicate = relation.get('relation_type', relation.get('relationship'))
if 'type' in relation:
predicate = relation.get('relation_type', relation.get('type'))
if 'relation_type' in relation:
predicate = relation.get('relation_type', relation.get('relation_type'))
# print(relation)
# print(source, target, predicate)
if source != "" and target != "" and predicate != "":
print(source, target, predicate)
if isinstance(target, list):
for target_one in target:
graph.add_edge(source, target_one, relation_type=predicate)
else:
graph.add_edge(source, target, relation_type=predicate)
# except:
# continue
# 定义数据
import pandas as pd
knowledge_glm = pd.read_excel("上市公司ESG_knowledge.xlsx").values.tolist()
# 添加所有数据到图谱
datasets = []
for i in knowledge_glm:
try:
data = eval(i[1])
except:
continue
datasets.append(data)
for dataset in datasets:
entities = dataset.get('entities', [])
relationships = dataset.get('relationships', dataset.get('relationships', []))
if relationships == []:
print(dataset)
add_entities_and_relationships(G, entities, relationships)
# 将 NetworkX 图转换为 Pyvis 图
net = Network(notebook=True, width="3000px", height="3750px", directed=True)
print(G.nodes())
# 添加节点和边
for node, data in G.nodes(data=True):
net.add_node(node, title=str(data), label=node)
print(G.edges())
for source, target, data in G.edges(data=True):
net.add_edge(source, target, title=data.get("relation_type", ""), label=data.get("relation_type", ""))
# 设置物理布局和其他参数
net.toggle_physics(True)
net.set_options("""
var options = {
"edges": {
"arrows": {
"to": {
"enabled": true
}
}
},
"physics": {
"barnesHut": {
"gravitationalConstant": -1600,
"springLength": 100,
"springConstant": 0.001
},
"minVelocity": 0.75
}
}
""")
# 保存为 HTML 文件
output_file = "上市公司ESG_knowledge_graph.html"
net.show(output_file)
print(f"知识图谱已保存为 {output_file}")
以下是对上述Python代码的详细解读:
1. 导入必要的库
import networkx as nx
from pyvis.network import Network
import pandas as pd
这段代码导入了三个主要的库:
networkx
(缩写为nx
):这是一个用于创建、操作和研究复杂网络结构(如图、有向图等)的Python库。它提供了丰富的图算法和数据结构来处理节点、边等图元素。pyvis.network
中的Network
类:pyvis
是一个用于在Python中可视化网络(图)的库,Network
类可以帮助将networkx
创建的图结构以交互式可视化的方式呈现出来,方便直观查看和分析。pandas
(缩写为pd
):用于数据处理和分析的强大库,在这里主要用于读取Excel文件中的数据。
2. 初始化知识图谱
G = nx.DiGraph()
这行代码创建了一个有向图对象G
,它将作为知识图谱的基础数据结构。后续会向这个有向图中添加节点和边来构建完整的知识图谱。
3. 定义添加节点和边的函数
def add_entities_and_relationships(graph, entities, relationships):
# 添加实体
for entity in entities:
graph.add_node(entity['name'], **entity)
# 添加关系
for relation in relationships:
source = ""
target = ""
if "subject" in relation:
source = relation.get('source', relation.get('subject'))
if "from" in relation:
source = relation.get('source', relation.get('from'))
if "source_event" in relation:
source = relation.get('source', relation.get('source_event'))
if "object" in relation:
target = relation.get('target', relation.get('object'))
if "target_event" in relation:
target = relation.get('target', relation.get('target_event'))
if "to" in relation:
target = relation.get('target', relation.get('to'))
predicate = ""
if "relationship" in relation:
predicate = relation.get('relation_type', relation.get('relationship'))
if 'type' in relation:
predicate = relation.get('relation_type', relation.get('type'))
if 'relation_type' in relation:
predicate = relation.get('relation_type', relation.get('relation_type'))
if source!= "" and target!= "" and predicate!= "":
print(source, target, predicate)
if isinstance(target, list):
for target_one in target:
graph.add_edge(source, target_one, relation_type=predicate)
else:
graph.add_edge(source, target, relation_type=predicate)
这个函数用于向给定的图(graph
参数,通常就是前面初始化的G
)中添加节点和边,接受三个参数:
graph
:要添加节点和边的networkx
图对象(例如前面创建的G
)。entities
:是一个包含实体信息的列表,每个元素(字典形式)代表一个实体,通过graph.add_node(entity['name'], **entity)
循环添加实体节点,其中**entity
表示将字典中的键值对作为节点的属性添加进去。relationships
:是一个包含关系信息的列表,每个元素(字典形式)描述了两个实体之间的关系。代码通过一系列if
语句从关系字典中获取源节点(source
)、目标节点(target
)以及关系类型(predicate
),然后根据target
是否为列表来决定是逐个添加边(如果target
是列表情况)还是直接添加一条边(target
不是列表情况),添加边时还设置了relation_type
属性为关系类型。
4. 读取数据并构建知识图谱
knowledge_glm = pd.read_excel("上市公司ESG_knowledge.xlsx").values.tolist()
datasets = []
for i in knowledge_glm:
try:
data = eval(i[1])
except:
continue
datasets.append(data)
for dataset in datasets:
entities = dataset.get('entities', [])
relationships = dataset.get('relationships', dataset.get('relationships', []))
if relationships == []:
print(dataset)
add_entities_and_relationships(G, entities, relationships)
- 首先,使用
pandas
的read_excel
函数读取名为"上市公司ESG_knowledge.xlsx"
的Excel文件,并将其内容转换为列表形式(每个元素对应Excel中的一行数据)存储在knowledge_glm
中。 - 接着,遍历
knowledge_glm
中的每一行数据,尝试使用eval
函数(这里要注意eval
使用存在一定安全风险,如果数据来源不可信可能导致代码注入等问题)将每行第二个元素(索引为1)解析为Python对象,并添加到datasets
列表中。 - 然后,对于每个
dataset
,从中获取实体信息(entities
)和关系信息(relationships
),如果关系信息不为空,就调用add_entities_and_relationships
函数将这些实体和关系添加到之前初始化的知识图谱G
中。
5. 将NetworkX
图转换为Pyvis
图并进行可视化设置
net = Network(notebook=True, width="3000px", height="3750px", directed=True)
for node, data in G.nodes(data=True):
net.add_node(node, title=str(data), label=node)
for source, target, data in G.edges(data=True):
net.add_edge(source, target, title=data.get("relation_type", ""), label=data.get("relation_type", ""))
- 首先创建了一个
pyvis
的Network
对象net
,设置了一些显示相关的参数,比如在notebook
环境中展示(notebook=True
)以及指定宽度和高度等,并且指明是有向图(directed=True
)。 - 然后通过循环遍历
G
(networkx
图)的节点和边,将节点及其属性添加到net
对象中,同时也将边及其关系类型属性添加到net
对象中,为后续可视化做准备。
6. 设置物理布局和其他可视化参数
net.toggle_physics(True)
net.set_options("""
var options = {
"edges": {
"arrows": {
"to": {
"enabled": true
}
}
},
"physics": {
"barnesHut": {
"gravitationalConstant": -1600,
"springLength": 100,
"springConstant": 0.001
},
"minVelocity": 0.75
}
}
""")
net.toggle_physics(True)
开启了物理模拟效果,使得节点在可视化界面中可以有动态的布局效果,类似有相互作用力影响它们的位置。net.set_options
函数用于设置可视化的一些详细配置选项,这里定义了一个JavaScript格式的配置字符串,设置了边的箭头显示(让箭头在边的终点显示)以及物理布局中barnesHut
算法相关的参数(如引力常数、弹簧长度、弹簧系数等)和最小速度等参数,来控制节点在可视化界面中的布局和动态行为。
7. 保存为HTML文件并输出提示信息
output_file = "上市公司ESG_knowledge_graph.html"
net.show(output_file)
print(f"知识图谱已保存为 {output_file}")
- 首先指定了要保存的HTML文件名称为
"上市公司ESG_knowledge_graph.html"
。 - 然后使用
net.show
方法将构建好并配置好的可视化知识图谱保存为指定的HTML文件,最后打印提示信息告知用户知识图谱已成功保存到相应文件中,方便后续在浏览器中打开查看可视化的知识图谱。
总体而言,这段代码实现了从读取数据(Excel文件中关于实体和关系的数据)构建知识图谱(基于networkx
库),再将其转换为适合可视化的形式(通过pyvis
库),最后保存为HTML文件以便直观查看和分析知识图谱的完整流程。不过代码中部分操作如使用eval
函数解析数据存在一定的安全隐患,可以考虑采用更安全的数据解析方式(比如使用ast.literal_eval
等)进行改进。
最终效果