Django不通过外键实现多表关联查询

简介: Django不通过外键实现多表关联查询

Django不通过外键实现多表关联查询



 

测试环境

Win 10

 

Python 3.5.4

 

Django-2.0.13.tar.gz

 

 

需求

不通过外键,使用django orm语法实现多个表之间的关联查询,类似如下sql的查询效果:

SELECT tb_project_version.*, tb_sprint.name, tb_project.name

FROM tb_project_version

JOIN tb_sprint ON tb_sprint.id=tb_project_version.sprint_id

JOIN tb_project ON tb_project.id=tb_project_version.project_id

 

数据表Model设计

 

class Sprint(models.Model):

   id = models.AutoField(primary_key=True, verbose_name='自增id')

   name = models.CharField(max_length=50, verbose_name='迭代名称')

   ...略  

 

   class Meta:

       db_table = 'tb_sprint'

       verbose_name = '产品迭代表'

       verbose_name_plural = verbose_name

 

class Project(models.Model):

   id = models.AutoField(primary_key=True, verbose_name='自增id')

   name = models.CharField(max_length=50, verbose_name='项目名称')

   ...略

 

   class Meta:

       db_table = 'tb_project'

       verbose_name = '项目表'

       verbose_name_plural = verbose_name

 

 

class ProjectVersion(models.Model):

   id = models.AutoField(primary_key=True, verbose_name='自增id')

   name = models.CharField(max_length=50, verbose_name='版本名称')

   project_id = models.IntegerField(verbose_name='关联的项目ID')

   sprint_id = models.IntegerField(verbose_name='关联的迭代ID')

   ...略

 

   class Meta:

       db_table = 'tb_project_version'

       verbose_name = '项目版本表'

       verbose_name_plural = verbose_name

 

实现方法1-通过extra api函数实现

 

如下,带背景色部分的内容为核心

 

serializers.py

#!/usr/bin/env python

# -*- coding:utf-8 -*-

 

from rest_framework import serializers

from backend.models import ProjectVersion

 

# ProjectVersion model 序列化器

class ProjectVersionSerializer(serializers.ModelSerializer):

   project = serializers.CharField(required=True)

   sprint = serializers.CharField(required=True)

 

   class Meta:

       model = ProjectVersion

       fields = '__all__'

       read_only_fields = ['project', 'sprint']

 

说明:如上,如果使用了django rest framework序列化,则需要为其序列化器添加model中不存在的字段,否则序列化后还是看不到对应的目标字段

 

project_version_views.py

#!/usr/bin/env python

# -*- coding:utf-8 -*-

 

__author__ = '授客'

 

from rest_framework.views import APIView

from rest_framework.response import Response

from rest_framework import status

 

from backend.models import ProjectVersion

from backend.serializers import ProjectVersionSerializer

 

 

 

class ProjectVersionListAPIView(APIView):

   '''

   项目视图-版本管理

   '''

   # 查询列表数据

   def get(self, request, format=None):

       result = {}

       try:

           params =  request.GET

           page_size = int(params.get('pageSize'))

           page_no = int(params.get('pageNo'))

           name = params.get('name')

           project_id = params.get('projectId')

           sort = params.get('sort')

           if sort:

               sort_list = sort.split(',')

           else:

               sort_list = ['-id']

 

           startIndex = (page_no - 1) * page_size

           endIndex = startIndex + page_size

           filters = {'is_delete':0}

           if name:

               filters['name__startswith'] = name

           if project_id:

               filters['project_id'] = project_id

           projectVersions = ProjectVersion.objects.filter(**filters).extra(

               select={'project': 'SELECT tb_project.name FROM tb_project WHERE tb_project.id = tb_project_version.project_id',

                       'sprint':'SELECT tb_sprint.name FROM tb_sprint WHERE tb_sprint.id = tb_project_version.sprint_id'},

           )

rows = projectVersions.order_by(*sort_list)[startIndex:endIndex]

           rows = ProjectVersionSerializer(rows, many=True).data

           total = projectVersions.count()

 

           result['msg'] =  '获取成功'

           result['success'] =  True

           result['data'] = {}

           result['data']['rows'] = rows

           result['data']['total'] = total

           return Response(result, status.HTTP_200_OK)

       except Exception as e:

           result['msg'] =  '%s' % e

           result['success'] =  False

           return Response(result, status.HTTP_500_INTERNAL_SERVER_ERROR)

 

说明:

projectVersions.order_by(*sort_list)[startIndex:endIndex]

 

等价于

 

SELECT (SELECT tb_project.name FROM tb_project WHERE tb_project.id = tb_project_version.project_id) AS `project`,

(SELECT tb_sprint.name FROM tb_sprint WHERE tb_sprint.id = tb_project_version.sprint_id) AS `sprint`,

`tb_project_version`.`id`,

`tb_project_version`.`name`,

`tb_project_version`.`project_id`,

`tb_project_version`.`sprint_id`,

...略

FROM `tb_project_version`

WHERE `tb_project_version`.`is_delete` = 0

ORDER BY `tb_project`.`id` DESC LIMIT 10 # 假设startIndex=0, endIndex=10

 

projectVersions.count()

等价于

SELECT COUNT(*) AS `__count` FROM `tb_project_version`

WHERE `tb_project_version`.`is_delete` = 0

 

 

 

上述查询代码的另一种实现

projectVersions =  Project.objects.filter(**filters).extra(

select={'project:'tb_project.name',

       'sprint':' tb_sprint.name',

tables=['tb_project', 'tb_sprint'],

where=['tb_project.id=tb_project_version.project_id', 'tb_sprint.id = tb_project_version.sprint_id']

)

rows = projectVersions.order_by(*sort_list)[startIndex:endIndex]

rows = ProjectVersionSerializer(rows, many=True).data

total = projectVersions.count()

 

 

projectVersions.order_by(*sort_list)[startIndex:endIndex]

 

等价于

 

SELECT (tb_project.name) AS `project`,

(tb_sprint.name) AS `sprint`,

`tb_project_version`.`id`,

`tb_project_version`.`name`,

`tb_project_version`.`project_id`,

`tb_project_version`.`sprint_id`,

...略

FROM `tb_project_version`

WHERE `tb_project_version`.`is_delete` = 0 AND (tb_project.id=tb_project_version.project_id) AND (tb_sprint.id = tb_project_version.sprint_id)

ORDER BY `tb_project`.`id` DESC LIMIT 10 # 假设startIndex=0, endIndex=10

 

 

projectVersions.count()

等价于

SELECT COUNT(*) AS `__count` FROM `tb_project_version` , `tb_project` , `tb_sprint` WHERE `tb_project_version`.`is_delete` = 0 AND (tb_project.id=tb_project_version.project_id) AND (tb_sprint.id = tb_project_version.sprint_id)

 

 

实现方法2-通过django rest framework实现

serializers.py

#!/usr/bin/env python

# -*- coding:utf-8 -*-

 

from rest_framework import serializers

from backend.models import ProjectVersion

from backend.models import Sprint

from backend.models import Project

 

 

# ProjectVersion model 序列化器

class ProjectVersionSerializer(serializers.ModelSerializer):

   project = serializers.SerializerMethodField()

   sprint = serializers.SerializerMethodField()

 

   def get_sprint(self, obj):

       """

       :param obj: 当前ProjectVersion的实例

       """

       current_project_version = obj

       sprint = Sprint.objects.filter(id=current_project_version.sprint_id).first()

       if sprint:

           return sprint.name

       else:

           return '--'

 

   def get_project(self, obj):

       """

       :param obj: 当前ProjectVersion的实例

       """

       current_project_version = obj

       project = Project.objects.filter(id=current_project_version.project_id).first()

       if project:

           return project.name

       else:

           return '--'

 

   class Meta:

       model = ProjectVersion

       fields = '__all__'

       read_only_fields = ['project', 'sprint']

 

project_version_views.py

#!/usr/bin/env python

# -*- coding:utf-8 -*-

 

__author__ = '授客'

 

from rest_framework.views import APIView

from rest_framework.response import Response

from rest_framework import status

 

from backend.models import ProjectVersion

from backend.serializers import ProjectVersionSerializer

 

 

 

class ProjectVersionListAPIView(APIView):

   '''

   项目视图-版本管理

   '''

   # 查询列表数据

   def get(self, request, format=None):

       result = {}

       try:

           params =  request.GET

           page_size = int(params.get('pageSize'))

           page_no = int(params.get('pageNo'))

           name = params.get('name')

           project_id = params.get('projectId')

           sort = params.get('sort')

           if sort:

               sort_list = sort.split(',')

           else:

               sort_list = ['-id']

 

           startIndex = (page_no - 1) * page_size

           endIndex = startIndex + page_size

           filters = {'is_delete':0}

           if name:

               filters['name__startswith'] = name

           if project_id:

               filters['project_id'] = project_id

           rows = ProjectVersion.objects.filter(**filters).order_by(*sort_list)[startIndex:endIndex]

           rows = ProjectVersionSerializer(rows, many=True).data

           total = ProjectVersion.objects.filter(**filters).count()

 

           result['msg'] =  '获取成功'

           result['success'] =  True

           result['data'] = {}

           result['data']['rows'] = rows

           result['data']['total'] = total

           return Response(result, status.HTTP_200_OK)

       except Exception as e:

           result['msg'] =  '%s' % e

           result['success'] =  False

           return Response(result, status.HTTP_500_INTERNAL_SERVER_ERROR)

 

方法3-通过raw函数执行原生sql

以下是项目中的一个实例,和本文上述内容没有任何关联,关键部分背景已着色,笔者偷懒,不做过多解释了,简单说下下面这段代码对用途:

 

主要是实现类似以下查询,获取指定分页对数据以及满足条件的记录记录总数。

 

SELECT tb_project.*, project_name_associated, project_id_associated, platform FROM tb_project

LEFT JOIN tb_project_associated ON tb_project.id=tb_project_associated.project_id

ORDER BY id DESC

LIMIT 0,10

 

 

from rest_framework.views import APIView

from rest_framework.response import Response

from rest_framework import status

from backend.models import Project

from backend.serializers import ProjectSerializer

 

import logging

 

 

logger = logging.getLogger('mylogger')

 

class ProjectListAPIView(APIView):

   '''

   项目视图-项目管理-项目列表

   '''

 

   # 查询列表数据

   def get(self, request, format=None):

       result = {}

       try:

           params =  request.GET

           page_size = int(params.get('pageSize'))

           page_no = int(params.get('pageNo'))

           name = params.get('name')

           project_status = params.get('status')

           sort = params.get('sort')

 

           order_by = 'id desc'

           if sort:

               order_by = sort

         

           startIndex = (page_no - 1) * page_size

 

           where = 'WHERE tb_project.is_delete=0 '

           filters = {'is_delete':0}

           if name:

               filters['name__startswith'] = name

               where += 'AND locate("%s", name) ' % name

 

           if project_status:

              where += "AND status='%s'" % project_status

 

           sql = 'SELECT tb_project.id, COUNT(1) AS count FROM tb_project LEFT JOIN tb_project_associated ON tb_project.id=tb_project_associated.project_id '

           query_rows = Project.objects.raw(sql)

           total = query_rows[0].__dict__.get('count') if query_rows else 0

 

           sql =  'SELECT tb_project.*,project_name_associated, project_id_associated, platform FROM tb_project LEFT JOIN tb_project_associated ON tb_project.id=tb_project_associated.project_id ' \

                   '%s ORDER BY %s ' \

                   'LIMIT %s,%s ' % (where,order_by, startIndex, page_size)

           query_rows = Project.objects.raw(sql)

           rows = []

           for item in query_rows:

               item.__dict__.pop('_state')

               item.__dict__['create_time'] = item.__dict__['create_time'].strftime('%Y-%m-%d %H:%M:%S')

               item.__dict__['update_time'] = item.__dict__['update_time'].strftime('%Y-%m-%d %H:%M:%S')

               item.__dict__['begin_time'] = item.__dict__['begin_time'].strftime('%Y-%m-%d')

               item.__dict__['end_time'] = item.__dict__['end_time'].strftime('%Y-%m-%d')

               rows.append(item.__dict__)

       

           result['msg'] =  '获取成功'

           result['success'] =  True

           result['data'] = {}

           result['data']['rows'] = rows

           result['data']['total'] = total

           return Response(result, status.HTTP_200_OK)

       except Exception as e:

           result['msg'] =  '%s' % e

           result['success'] =  False

           return Response(result, status.HTTP_500_INTERNAL_SERVER_ERROR)

 

 

参考链接

https://docs.djangoproject.com/en/1.11/ref/models/querysets/#django.db.models.query.QuerySet.extra

https://www.jianshu.com/p/973971880da7

目录
相关文章
|
5月前
|
SQL 缓存 数据库
10 Django模型 - 模型查询
10 Django模型 - 模型查询
41 0
|
7月前
|
缓存 数据库 Python
django这些查询技巧你会了吗?
django这些查询技巧你会了吗?
|
8月前
|
SQL 数据库 Python
|
8月前
|
SQL Python
|
9月前
|
SQL 数据库 索引
【Django学习】(六)ORM框架_关联模型_数据创建&查询&更新&删除&过滤
【Django学习】(六)ORM框架_关联模型_数据创建&查询&更新&删除&过滤
【Django学习】(六)ORM框架_关联模型_数据创建&查询&更新&删除&过滤
|
11月前
|
关系型数据库 MySQL Python
【一日一技】Django不定项数的“或查询”
【一日一技】Django不定项数的“或查询”
49 0
|
SQL JSON 测试技术
Django model层之执行原始SQL查询
Django model层之执行原始SQL查询
87 0
|
关系型数据库 MySQL 数据库
Django model 层之聚合查询总结
Django model 层之聚合查询总结
79 0
|
安全 小程序 开发工具
Python3+Django2集成PayPal(贝宝)跨境支付三方接口以及订单查询和退款业务
如果您所在的公司涉及外贸或者跨境支付业务,那一定听说过大名鼎鼎的PayPal,总的来说,PayPal在跨国贸易里的优势还是比较大的,作为一种外贸支付方式,目前在国际贸易支付服务中倍受亿万用户追捧,是全球商户和消费者最受欢迎的电子支付方式之一,在跨境交易中有着超过90%的卖家和超过85%的买家认可并正在使用PayPal电子支付业务。当然,PayPal国际业务体量如此惊人,肯定不是毫无原因的。
Python3+Django2集成PayPal(贝宝)跨境支付三方接口以及订单查询和退款业务
|
数据库 Python
Django(10)ORM聚合查询(二)
Django(10)ORM聚合查询(二)
103 0
Django(10)ORM聚合查询(二)