Django不通过外键实现多表关联查询

简介: Django不通过外键实现多表关联查询

Django不通过外键实现多表关联查询



 

测试环境

Win 10

 

Python 3.5.4

 

Django-2.0.13.tar.gz

 

 

需求

不通过外键,使用django orm语法实现多个表之间的关联查询,类似如下sql的查询效果:

SELECT tb_project_version.*, tb_sprint.name, tb_project.name

FROM tb_project_version

JOIN tb_sprint ON tb_sprint.id=tb_project_version.sprint_id

JOIN tb_project ON tb_project.id=tb_project_version.project_id

 

数据表Model设计

 

class Sprint(models.Model):

   id = models.AutoField(primary_key=True, verbose_name='自增id')

   name = models.CharField(max_length=50, verbose_name='迭代名称')

   ...略  

 

   class Meta:

       db_table = 'tb_sprint'

       verbose_name = '产品迭代表'

       verbose_name_plural = verbose_name

 

class Project(models.Model):

   id = models.AutoField(primary_key=True, verbose_name='自增id')

   name = models.CharField(max_length=50, verbose_name='项目名称')

   ...略

 

   class Meta:

       db_table = 'tb_project'

       verbose_name = '项目表'

       verbose_name_plural = verbose_name

 

 

class ProjectVersion(models.Model):

   id = models.AutoField(primary_key=True, verbose_name='自增id')

   name = models.CharField(max_length=50, verbose_name='版本名称')

   project_id = models.IntegerField(verbose_name='关联的项目ID')

   sprint_id = models.IntegerField(verbose_name='关联的迭代ID')

   ...略

 

   class Meta:

       db_table = 'tb_project_version'

       verbose_name = '项目版本表'

       verbose_name_plural = verbose_name

 

实现方法1-通过extra api函数实现

 

如下,带背景色部分的内容为核心

 

serializers.py

#!/usr/bin/env python

# -*- coding:utf-8 -*-

 

from rest_framework import serializers

from backend.models import ProjectVersion

 

# ProjectVersion model 序列化器

class ProjectVersionSerializer(serializers.ModelSerializer):

   project = serializers.CharField(required=True)

   sprint = serializers.CharField(required=True)

 

   class Meta:

       model = ProjectVersion

       fields = '__all__'

       read_only_fields = ['project', 'sprint']

 

说明:如上,如果使用了django rest framework序列化,则需要为其序列化器添加model中不存在的字段,否则序列化后还是看不到对应的目标字段

 

project_version_views.py

#!/usr/bin/env python

# -*- coding:utf-8 -*-

 

__author__ = '授客'

 

from rest_framework.views import APIView

from rest_framework.response import Response

from rest_framework import status

 

from backend.models import ProjectVersion

from backend.serializers import ProjectVersionSerializer

 

 

 

class ProjectVersionListAPIView(APIView):

   '''

   项目视图-版本管理

   '''

   # 查询列表数据

   def get(self, request, format=None):

       result = {}

       try:

           params =  request.GET

           page_size = int(params.get('pageSize'))

           page_no = int(params.get('pageNo'))

           name = params.get('name')

           project_id = params.get('projectId')

           sort = params.get('sort')

           if sort:

               sort_list = sort.split(',')

           else:

               sort_list = ['-id']

 

           startIndex = (page_no - 1) * page_size

           endIndex = startIndex + page_size

           filters = {'is_delete':0}

           if name:

               filters['name__startswith'] = name

           if project_id:

               filters['project_id'] = project_id

           projectVersions = ProjectVersion.objects.filter(**filters).extra(

               select={'project': 'SELECT tb_project.name FROM tb_project WHERE tb_project.id = tb_project_version.project_id',

                       'sprint':'SELECT tb_sprint.name FROM tb_sprint WHERE tb_sprint.id = tb_project_version.sprint_id'},

           )

rows = projectVersions.order_by(*sort_list)[startIndex:endIndex]

           rows = ProjectVersionSerializer(rows, many=True).data

           total = projectVersions.count()

 

           result['msg'] =  '获取成功'

           result['success'] =  True

           result['data'] = {}

           result['data']['rows'] = rows

           result['data']['total'] = total

           return Response(result, status.HTTP_200_OK)

       except Exception as e:

           result['msg'] =  '%s' % e

           result['success'] =  False

           return Response(result, status.HTTP_500_INTERNAL_SERVER_ERROR)

 

说明:

projectVersions.order_by(*sort_list)[startIndex:endIndex]

 

等价于

 

SELECT (SELECT tb_project.name FROM tb_project WHERE tb_project.id = tb_project_version.project_id) AS `project`,

(SELECT tb_sprint.name FROM tb_sprint WHERE tb_sprint.id = tb_project_version.sprint_id) AS `sprint`,

`tb_project_version`.`id`,

`tb_project_version`.`name`,

`tb_project_version`.`project_id`,

`tb_project_version`.`sprint_id`,

...略

FROM `tb_project_version`

WHERE `tb_project_version`.`is_delete` = 0

ORDER BY `tb_project`.`id` DESC LIMIT 10 # 假设startIndex=0, endIndex=10

 

projectVersions.count()

等价于

SELECT COUNT(*) AS `__count` FROM `tb_project_version`

WHERE `tb_project_version`.`is_delete` = 0

 

 

 

上述查询代码的另一种实现

projectVersions =  Project.objects.filter(**filters).extra(

select={'project:'tb_project.name',

       'sprint':' tb_sprint.name',

tables=['tb_project', 'tb_sprint'],

where=['tb_project.id=tb_project_version.project_id', 'tb_sprint.id = tb_project_version.sprint_id']

)

rows = projectVersions.order_by(*sort_list)[startIndex:endIndex]

rows = ProjectVersionSerializer(rows, many=True).data

total = projectVersions.count()

 

 

projectVersions.order_by(*sort_list)[startIndex:endIndex]

 

等价于

 

SELECT (tb_project.name) AS `project`,

(tb_sprint.name) AS `sprint`,

`tb_project_version`.`id`,

`tb_project_version`.`name`,

`tb_project_version`.`project_id`,

`tb_project_version`.`sprint_id`,

...略

FROM `tb_project_version`

WHERE `tb_project_version`.`is_delete` = 0 AND (tb_project.id=tb_project_version.project_id) AND (tb_sprint.id = tb_project_version.sprint_id)

ORDER BY `tb_project`.`id` DESC LIMIT 10 # 假设startIndex=0, endIndex=10

 

 

projectVersions.count()

等价于

SELECT COUNT(*) AS `__count` FROM `tb_project_version` , `tb_project` , `tb_sprint` WHERE `tb_project_version`.`is_delete` = 0 AND (tb_project.id=tb_project_version.project_id) AND (tb_sprint.id = tb_project_version.sprint_id)

 

 

实现方法2-通过django rest framework实现

serializers.py

#!/usr/bin/env python

# -*- coding:utf-8 -*-

 

from rest_framework import serializers

from backend.models import ProjectVersion

from backend.models import Sprint

from backend.models import Project

 

 

# ProjectVersion model 序列化器

class ProjectVersionSerializer(serializers.ModelSerializer):

   project = serializers.SerializerMethodField()

   sprint = serializers.SerializerMethodField()

 

   def get_sprint(self, obj):

       """

       :param obj: 当前ProjectVersion的实例

       """

       current_project_version = obj

       sprint = Sprint.objects.filter(id=current_project_version.sprint_id).first()

       if sprint:

           return sprint.name

       else:

           return '--'

 

   def get_project(self, obj):

       """

       :param obj: 当前ProjectVersion的实例

       """

       current_project_version = obj

       project = Project.objects.filter(id=current_project_version.project_id).first()

       if project:

           return project.name

       else:

           return '--'

 

   class Meta:

       model = ProjectVersion

       fields = '__all__'

       read_only_fields = ['project', 'sprint']

 

project_version_views.py

#!/usr/bin/env python

# -*- coding:utf-8 -*-

 

__author__ = '授客'

 

from rest_framework.views import APIView

from rest_framework.response import Response

from rest_framework import status

 

from backend.models import ProjectVersion

from backend.serializers import ProjectVersionSerializer

 

 

 

class ProjectVersionListAPIView(APIView):

   '''

   项目视图-版本管理

   '''

   # 查询列表数据

   def get(self, request, format=None):

       result = {}

       try:

           params =  request.GET

           page_size = int(params.get('pageSize'))

           page_no = int(params.get('pageNo'))

           name = params.get('name')

           project_id = params.get('projectId')

           sort = params.get('sort')

           if sort:

               sort_list = sort.split(',')

           else:

               sort_list = ['-id']

 

           startIndex = (page_no - 1) * page_size

           endIndex = startIndex + page_size

           filters = {'is_delete':0}

           if name:

               filters['name__startswith'] = name

           if project_id:

               filters['project_id'] = project_id

           rows = ProjectVersion.objects.filter(**filters).order_by(*sort_list)[startIndex:endIndex]

           rows = ProjectVersionSerializer(rows, many=True).data

           total = ProjectVersion.objects.filter(**filters).count()

 

           result['msg'] =  '获取成功'

           result['success'] =  True

           result['data'] = {}

           result['data']['rows'] = rows

           result['data']['total'] = total

           return Response(result, status.HTTP_200_OK)

       except Exception as e:

           result['msg'] =  '%s' % e

           result['success'] =  False

           return Response(result, status.HTTP_500_INTERNAL_SERVER_ERROR)

 

方法3-通过raw函数执行原生sql

以下是项目中的一个实例,和本文上述内容没有任何关联,关键部分背景已着色,笔者偷懒,不做过多解释了,简单说下下面这段代码对用途:

 

主要是实现类似以下查询,获取指定分页对数据以及满足条件的记录记录总数。

 

SELECT tb_project.*, project_name_associated, project_id_associated, platform FROM tb_project

LEFT JOIN tb_project_associated ON tb_project.id=tb_project_associated.project_id

ORDER BY id DESC

LIMIT 0,10

 

 

from rest_framework.views import APIView

from rest_framework.response import Response

from rest_framework import status

from backend.models import Project

from backend.serializers import ProjectSerializer

 

import logging

 

 

logger = logging.getLogger('mylogger')

 

class ProjectListAPIView(APIView):

   '''

   项目视图-项目管理-项目列表

   '''

 

   # 查询列表数据

   def get(self, request, format=None):

       result = {}

       try:

           params =  request.GET

           page_size = int(params.get('pageSize'))

           page_no = int(params.get('pageNo'))

           name = params.get('name')

           project_status = params.get('status')

           sort = params.get('sort')

 

           order_by = 'id desc'

           if sort:

               order_by = sort

         

           startIndex = (page_no - 1) * page_size

 

           where = 'WHERE tb_project.is_delete=0 '

           filters = {'is_delete':0}

           if name:

               filters['name__startswith'] = name

               where += 'AND locate("%s", name) ' % name

 

           if project_status:

              where += "AND status='%s'" % project_status

 

           sql = 'SELECT tb_project.id, COUNT(1) AS count FROM tb_project LEFT JOIN tb_project_associated ON tb_project.id=tb_project_associated.project_id '

           query_rows = Project.objects.raw(sql)

           total = query_rows[0].__dict__.get('count') if query_rows else 0

 

           sql =  'SELECT tb_project.*,project_name_associated, project_id_associated, platform FROM tb_project LEFT JOIN tb_project_associated ON tb_project.id=tb_project_associated.project_id ' \

                   '%s ORDER BY %s ' \

                   'LIMIT %s,%s ' % (where,order_by, startIndex, page_size)

           query_rows = Project.objects.raw(sql)

           rows = []

           for item in query_rows:

               item.__dict__.pop('_state')

               item.__dict__['create_time'] = item.__dict__['create_time'].strftime('%Y-%m-%d %H:%M:%S')

               item.__dict__['update_time'] = item.__dict__['update_time'].strftime('%Y-%m-%d %H:%M:%S')

               item.__dict__['begin_time'] = item.__dict__['begin_time'].strftime('%Y-%m-%d')

               item.__dict__['end_time'] = item.__dict__['end_time'].strftime('%Y-%m-%d')

               rows.append(item.__dict__)

       

           result['msg'] =  '获取成功'

           result['success'] =  True

           result['data'] = {}

           result['data']['rows'] = rows

           result['data']['total'] = total

           return Response(result, status.HTTP_200_OK)

       except Exception as e:

           result['msg'] =  '%s' % e

           result['success'] =  False

           return Response(result, status.HTTP_500_INTERNAL_SERVER_ERROR)

 

 

参考链接

https://docs.djangoproject.com/en/1.11/ref/models/querysets/#django.db.models.query.QuerySet.extra

https://www.jianshu.com/p/973971880da7

目录
相关文章
|
1月前
|
SQL Go 数据库
【速存】深入理解Django ORM:编写高效的数据库查询
【速存】深入理解Django ORM:编写高效的数据库查询
59 0
|
1月前
|
SQL 前端开发 Python
基于python-django的neo4j人民的名义关系图谱查询系统
基于python-django的neo4j人民的名义关系图谱查询系统
32 0
|
2月前
|
数据库 Python
django中数据库外键可以自定义名称吗
django中数据库外键可以自定义名称吗
|
4月前
|
SQL 数据库 Python
Django框架数据库ORM查询操作(6)
【7月更文挑战第6天】```markdown Django ORM常用数据库操作:1) 查询所有数据2) 根据ID查询 3) 精确查询 4) 分页排序
75 1
|
5月前
|
JSON 数据库 数据格式
使用 Django Q 对象构建复杂查询条件
通过本文示例,我们展示了如何使用Django的Q对象来构建复杂的查询条件,以及如何实现分页功能。Q对象的强大之处在于它能够轻松地组合多个查询条件,支持“与”、“或”关系,极大地提高了查询的灵活性和可读性。希望本文对你在实际项目中使用Django ORM构建复杂查询有所帮助。
|
5月前
|
存储 SQL 数据处理
Django ORM实战:模型字段与元选项配置,以及链式过滤与QF查询详解
Django ORM实战:模型字段与元选项配置,以及链式过滤与QF查询详解
|
5月前
|
前端开发 数据库 Python
Python Django项目下的分页和筛选查询
在Django中实现分页功能,视图函数通过`Paginator`处理数据,每页显示10条记录。URL配置支持带参数和不带参数的分页请求。前端模板使用for循环展示分页数据,包括商品信息和状态按钮,并利用分页组件导航。筛选查询视图根据GET请求的`state`参数过滤上架或下架产品,同样实现分页功能。前端添加状态选择下拉框,分页链接携带查询参数`state`确保筛选状态在翻页时保持。
|
SQL 缓存 数据库
10 Django模型 - 模型查询
10 Django模型 - 模型查询
62 0
|
6月前
|
SQL 缓存 数据库
在Python Web开发过程中:数据库与缓存,如何使用ORM(例如Django ORM)执行查询并优化查询性能?
在Python Web开发中,使用ORM如Django ORM能简化数据库操作。为了优化查询性能,可以:选择合适索引,避免N+1查询(利用`select_related`和`prefetch_related`),批量读取数据(`iterator()`),使用缓存,分页查询,适时使用原生SQL,优化数据库配置,定期优化数据库并监控性能。这些策略能提升响应速度和用户体验。
58 0
|
缓存 数据库 Python
django这些查询技巧你会了吗?
django这些查询技巧你会了吗?