前提条件
- 先往Neo4j 里,准备数据 参考:https://www.cnblogs.com/vipsoft/p/17631347.html#创建传承人
- 搭建 FastAPI 项目:https://www.cnblogs.com/vipsoft/p/17684079.html
改造
utils/neo4j_provider.py
增加了暴露给外面调用的属性,同时提供了同步和异步执行的驱动
#!/usr/bin/python3 import os from neo4j import GraphDatabase, AsyncGraphDatabase, basic_auth, Driver, AsyncDriver from settings import settings # Neo4j 数据库操作类 class Neo4jProvider: """创建 Neo4j 数据库连接""" def __init__(self) -> None: # 获取环境变量值,如果没有就返回默认值 self.url = settings.NEO4J_URI self.username = settings.NEO4J_USER self.password = settings.NEO4J_PASSWORD self.neo4j_version = settings.NEO4J_VERSION self.database = settings.NEO4J_DATABASE self.port = int(settings.NEO4J_PORT) # 同步驱动 def driver(self) -> Driver: return GraphDatabase.driver(self.url, auth=basic_auth(self.username, self.password)) # 异步驱动 def async_driver(self) -> AsyncDriver: return AsyncGraphDatabase.driver(self.url, auth=basic_auth(self.username, self.password)) # 同步驱动。暴露给外面调用 driver = Neo4jProvider().driver() # 异步驱动。暴露给外面调用 asyncDriver = Neo4jProvider().async_driver()
routers/node_router.py
添加一个查询数据的接口方法
#!/usr/bin/python3 import logging from fastapi import APIRouter, status from fastapi.responses import JSONResponse from utils.neo4j_provider import asyncDriver from settings import settings router = APIRouter() # 定义一个根路由 @router.get("/add") def add_node(): # TODO 往 neo4j 里创建新的节点 data = { 'code': 0, 'message': '', 'data': 'add success' } return JSONResponse(content=data, status_code=status.HTTP_200_OK) @router.route("/search") async def get_search(q: str = None): if q is None: return [] cql = (""" MATCH (p:Person) WHERE p.name CONTAINS $name RETURN p """) records, _, _ = await asyncDriver.execute_query( cql, name=q.query_params['name'], # 将参数 q 传给cql 里的 $name 变量 -- Python 很聪明,直接 q.query_params 就可以获取参数值了 database_=settings.NEO4J_DATABASE, routing_="r", ) for record in records: # 打印出 record 属性 logging.info("%s, %s", record["p"]["name"], record["p"]["generation"]) # 转成 json data = [serialize_person(record["p"]) for record in records] return JSONResponse(content=data, status_code=status.HTTP_200_OK) def serialize_person(person): return { "id": person["id"], "name": person["name"], "generation": person["generation"], "votes": person.get("votes", 0) }
运行效果
http://127.0.0.1:8000/api/node/search?name=%E9%99%88%E9%95%BF%E5%85%B4