Django不通过外键实现多表关联查询
测试环境
Win 10
Python 3.5.4
Django-2.0.13.tar.gz
需求
不通过外键,使用django orm语法实现多个表之间的关联查询,类似如下sql的查询效果:
SELECT tb_project_version.*, tb_sprint.name, tb_project.name
FROM tb_project_version
JOIN tb_sprint ON tb_sprint.id=tb_project_version.sprint_id
JOIN tb_project ON tb_project.id=tb_project_version.project_id
数据表Model设计
class Sprint(models.Model):
id = models.AutoField(primary_key=True, verbose_name='自增id')
name = models.CharField(max_length=50, verbose_name='迭代名称')
...略
class Meta:
db_table = 'tb_sprint'
verbose_name = '产品迭代表'
verbose_name_plural = verbose_name
class Project(models.Model):
id = models.AutoField(primary_key=True, verbose_name='自增id')
name = models.CharField(max_length=50, verbose_name='项目名称')
...略
class Meta:
db_table = 'tb_project'
verbose_name = '项目表'
verbose_name_plural = verbose_name
class ProjectVersion(models.Model):
id = models.AutoField(primary_key=True, verbose_name='自增id')
name = models.CharField(max_length=50, verbose_name='版本名称')
project_id = models.IntegerField(verbose_name='关联的项目ID')
sprint_id = models.IntegerField(verbose_name='关联的迭代ID')
...略
class Meta:
db_table = 'tb_project_version'
verbose_name = '项目版本表'
verbose_name_plural = verbose_name
实现方法1-通过extra api函数实现
如下,带背景色部分的内容为核心
serializers.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework import serializers
from backend.models import ProjectVersion
# ProjectVersion model 序列化器
class ProjectVersionSerializer(serializers.ModelSerializer):
project = serializers.CharField(required=True)
sprint = serializers.CharField(required=True)
class Meta:
model = ProjectVersion
fields = '__all__'
read_only_fields = ['project', 'sprint']
说明:如上,如果使用了django rest framework序列化,则需要为其序列化器添加model中不存在的字段,否则序列化后还是看不到对应的目标字段
project_version_views.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
__author__ = '授客'
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from backend.models import ProjectVersion
from backend.serializers import ProjectVersionSerializer
class ProjectVersionListAPIView(APIView):
'''
项目视图-版本管理
'''
# 查询列表数据
def get(self, request, format=None):
result = {}
try:
params = request.GET
page_size = int(params.get('pageSize'))
page_no = int(params.get('pageNo'))
name = params.get('name')
project_id = params.get('projectId')
sort = params.get('sort')
if sort:
sort_list = sort.split(',')
else:
sort_list = ['-id']
startIndex = (page_no - 1) * page_size
endIndex = startIndex + page_size
filters = {'is_delete':0}
if name:
filters['name__startswith'] = name
if project_id:
filters['project_id'] = project_id
projectVersions = ProjectVersion.objects.filter(**filters).extra(
select={'project': 'SELECT tb_project.name FROM tb_project WHERE tb_project.id = tb_project_version.project_id',
'sprint':'SELECT tb_sprint.name FROM tb_sprint WHERE tb_sprint.id = tb_project_version.sprint_id'},
)
rows = projectVersions.order_by(*sort_list)[startIndex:endIndex]
rows = ProjectVersionSerializer(rows, many=True).data
total = projectVersions.count()
result['msg'] = '获取成功'
result['success'] = True
result['data'] = {}
result['data']['rows'] = rows
result['data']['total'] = total
return Response(result, status.HTTP_200_OK)
except Exception as e:
result['msg'] = '%s' % e
result['success'] = False
return Response(result, status.HTTP_500_INTERNAL_SERVER_ERROR)
说明:
projectVersions.order_by(*sort_list)[startIndex:endIndex]
等价于
SELECT (SELECT tb_project.name FROM tb_project WHERE tb_project.id = tb_project_version.project_id) AS `project`,
(SELECT tb_sprint.name FROM tb_sprint WHERE tb_sprint.id = tb_project_version.sprint_id) AS `sprint`,
`tb_project_version`.`id`,
`tb_project_version`.`name`,
`tb_project_version`.`project_id`,
`tb_project_version`.`sprint_id`,
...略
FROM `tb_project_version`
WHERE `tb_project_version`.`is_delete` = 0
ORDER BY `tb_project`.`id` DESC LIMIT 10 # 假设startIndex=0, endIndex=10
projectVersions.count()
等价于
SELECT COUNT(*) AS `__count` FROM `tb_project_version`
WHERE `tb_project_version`.`is_delete` = 0
上述查询代码的另一种实现
projectVersions = Project.objects.filter(**filters).extra(
select={'project:'tb_project.name',
'sprint':' tb_sprint.name',
tables=['tb_project', 'tb_sprint'],
where=['tb_project.id=tb_project_version.project_id', 'tb_sprint.id = tb_project_version.sprint_id']
)
rows = projectVersions.order_by(*sort_list)[startIndex:endIndex]
rows = ProjectVersionSerializer(rows, many=True).data
total = projectVersions.count()
projectVersions.order_by(*sort_list)[startIndex:endIndex]
等价于
SELECT (tb_project.name) AS `project`,
(tb_sprint.name) AS `sprint`,
`tb_project_version`.`id`,
`tb_project_version`.`name`,
`tb_project_version`.`project_id`,
`tb_project_version`.`sprint_id`,
...略
FROM `tb_project_version`
WHERE `tb_project_version`.`is_delete` = 0 AND (tb_project.id=tb_project_version.project_id) AND (tb_sprint.id = tb_project_version.sprint_id)
ORDER BY `tb_project`.`id` DESC LIMIT 10 # 假设startIndex=0, endIndex=10
projectVersions.count()
等价于
SELECT COUNT(*) AS `__count` FROM `tb_project_version` , `tb_project` , `tb_sprint` WHERE `tb_project_version`.`is_delete` = 0 AND (tb_project.id=tb_project_version.project_id) AND (tb_sprint.id = tb_project_version.sprint_id)
实现方法2-通过django rest framework实现
serializers.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework import serializers
from backend.models import ProjectVersion
from backend.models import Sprint
from backend.models import Project
# ProjectVersion model 序列化器
class ProjectVersionSerializer(serializers.ModelSerializer):
project = serializers.SerializerMethodField()
sprint = serializers.SerializerMethodField()
def get_sprint(self, obj):
"""
:param obj: 当前ProjectVersion的实例
"""
current_project_version = obj
sprint = Sprint.objects.filter(id=current_project_version.sprint_id).first()
if sprint:
return sprint.name
else:
return '--'
def get_project(self, obj):
"""
:param obj: 当前ProjectVersion的实例
"""
current_project_version = obj
project = Project.objects.filter(id=current_project_version.project_id).first()
if project:
return project.name
else:
return '--'
class Meta:
model = ProjectVersion
fields = '__all__'
read_only_fields = ['project', 'sprint']
project_version_views.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
__author__ = '授客'
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from backend.models import ProjectVersion
from backend.serializers import ProjectVersionSerializer
class ProjectVersionListAPIView(APIView):
'''
项目视图-版本管理
'''
# 查询列表数据
def get(self, request, format=None):
result = {}
try:
params = request.GET
page_size = int(params.get('pageSize'))
page_no = int(params.get('pageNo'))
name = params.get('name')
project_id = params.get('projectId')
sort = params.get('sort')
if sort:
sort_list = sort.split(',')
else:
sort_list = ['-id']
startIndex = (page_no - 1) * page_size
endIndex = startIndex + page_size
filters = {'is_delete':0}
if name:
filters['name__startswith'] = name
if project_id:
filters['project_id'] = project_id
rows = ProjectVersion.objects.filter(**filters).order_by(*sort_list)[startIndex:endIndex]
rows = ProjectVersionSerializer(rows, many=True).data
total = ProjectVersion.objects.filter(**filters).count()
result['msg'] = '获取成功'
result['success'] = True
result['data'] = {}
result['data']['rows'] = rows
result['data']['total'] = total
return Response(result, status.HTTP_200_OK)
except Exception as e:
result['msg'] = '%s' % e
result['success'] = False
return Response(result, status.HTTP_500_INTERNAL_SERVER_ERROR)
方法3-通过raw函数执行原生sql
以下是项目中的一个实例,和本文上述内容没有任何关联,关键部分背景已着色,笔者偷懒,不做过多解释了,简单说下下面这段代码对用途:
主要是实现类似以下查询,获取指定分页对数据以及满足条件的记录记录总数。
SELECT tb_project.*, project_name_associated, project_id_associated, platform FROM tb_project
LEFT JOIN tb_project_associated ON tb_project.id=tb_project_associated.project_id
ORDER BY id DESC
LIMIT 0,10
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from backend.models import Project
from backend.serializers import ProjectSerializer
import logging
logger = logging.getLogger('mylogger')
class ProjectListAPIView(APIView):
'''
项目视图-项目管理-项目列表
'''
# 查询列表数据
def get(self, request, format=None):
result = {}
try:
params = request.GET
page_size = int(params.get('pageSize'))
page_no = int(params.get('pageNo'))
name = params.get('name')
project_status = params.get('status')
sort = params.get('sort')
order_by = 'id desc'
if sort:
order_by = sort
startIndex = (page_no - 1) * page_size
where = 'WHERE tb_project.is_delete=0 '
filters = {'is_delete':0}
if name:
filters['name__startswith'] = name
where += 'AND locate("%s", name) ' % name
if project_status:
where += "AND status='%s'" % project_status
sql = 'SELECT tb_project.id, COUNT(1) AS count FROM tb_project LEFT JOIN tb_project_associated ON tb_project.id=tb_project_associated.project_id '
query_rows = Project.objects.raw(sql)
total = query_rows[0].__dict__.get('count') if query_rows else 0
sql = 'SELECT tb_project.*,project_name_associated, project_id_associated, platform FROM tb_project LEFT JOIN tb_project_associated ON tb_project.id=tb_project_associated.project_id ' \
'%s ORDER BY %s ' \
'LIMIT %s,%s ' % (where,order_by, startIndex, page_size)
query_rows = Project.objects.raw(sql)
rows = []
for item in query_rows:
item.__dict__.pop('_state')
item.__dict__['create_time'] = item.__dict__['create_time'].strftime('%Y-%m-%d %H:%M:%S')
item.__dict__['update_time'] = item.__dict__['update_time'].strftime('%Y-%m-%d %H:%M:%S')
item.__dict__['begin_time'] = item.__dict__['begin_time'].strftime('%Y-%m-%d')
item.__dict__['end_time'] = item.__dict__['end_time'].strftime('%Y-%m-%d')
rows.append(item.__dict__)
result['msg'] = '获取成功'
result['success'] = True
result['data'] = {}
result['data']['rows'] = rows
result['data']['total'] = total
return Response(result, status.HTTP_200_OK)
except Exception as e:
result['msg'] = '%s' % e
result['success'] = False
return Response(result, status.HTTP_500_INTERNAL_SERVER_ERROR)
参考链接
https://docs.djangoproject.com/en/1.11/ref/models/querysets/#django.db.models.query.QuerySet.extra