python tornado 实现类禅道系统

简介: 笔记

背景:目前市面上有很多bug管理工具,但是各有各的特点,最著名,最流行的就是禅道,一个偶然的机会接触到了python ,学到tornado后,就想着去怎么去用到实处,后来发现自己公司的除了禅道就记录bug没有什么可以用的工具了。

语言:python3 第三库 :tornado,qiniu(用于云存储文件),数据库用sqlite

why  use tornado?很多人其实会这么问我,我感觉tornado可以实现异步,虽然现在代码还没有用到异步,我感觉还是很不错的框架,值得学习,现在很多公司都在用,个人感觉这是一个不错的,值得我们大家去学习的框架。

来看看我的需求文档

13.png

大题是这么的轮廓,那么拿到这个的时候,我会进行需求分析,

需要什么样的数据库, 数据模型之间的关系,虽然现在还是有很多地方是写死的还没有进行搜索功能的设置,但是我相信,有了现在这个demo

那么我开始来设计下我主要会结构

14.png

handlsers 存放类似flask的views

models存数据库相关的,

static存放静态

template存放模板

untils存放公共库

setting 配置文件

urls url地址映射

run 运行文件。

那么我来开始设计我的数据,其实我的数据模型也是一波三折的。

选择用了sqlalchemy,之前用过flask的sqlalchemy感觉不错

from models.dataconfig import db_session,Base,create_all
from sqlalchemy import  Column,Integer,DateTime,Boolean,String,ForeignKey,desc,asc,Text
from sqlalchemy.orm import  relationship,backref
from untils.common import encrypt
import datetime
class User(Base):
    __tablename__='users'
    id=Column(Integer(),primary_key=True)
    username=Column(String(64),unique=True,index=True)
    email=Column(String(64))
    password=Column(String(64))
    last_logtime=Column(DateTime())
    status=Column(Integer())
    leves=Column(Integer())
    iphone=Column(Integer())
    Projects=relationship('Project',backref='users')
    shebei=relationship('Shebei',backref='users')
    file=relationship('FilePan',backref='users')
    banben=relationship('BanbenWrite',backref='users')
    testresult=relationship('TestResult',backref='users')
    testcase=relationship('TestCase',backref='users')
    buglog=relationship('BugLog',backref='users')
    def __repr__(self):
        return self.username
    @classmethod
    def get_by_id(cls, id):
        item = db_session.query(User).filter(User.id==id).first()
        return item
    @classmethod
    def get_by_username(cls, username):
        item = db_session.query(User).filter(User.username== username).first()
        return item
    @classmethod
    def get_count(cls):
        return db_session.query(Shebei).count()
    @classmethod
    def add_new(cls,username,password,iphone,email,leves):
        new=User(username=username,iphone=iphone,email=email,leves=leves)
        new.password=encrypt(password)
        new.status=0
        db_session.add(new)
        try:
            db_session.commit()
        except:
            db_session.rollback()
class Shebei(Base):
    __tablename__='shebeis'
    id=Column(Integer(),primary_key=True)
    shebei_id=Column(String(32),unique=True)
    shebei_name=Column(String(64))
    shebei_xitong=Column(String(64))
    shebei_xinghao=Column(String(255))
    shebei_jiage=Column(Integer())
    shebei_fapiaobianhao=Column(String(64))
    shebei_quanxian=Column(Boolean())
    shebei_jie=Column(Boolean())
    shebei_shuyu=Column(String())
    shebei_date=Column(DateTime())
    shebei_user=Column(String())
    gou_date=Column(DateTime())
    shebei_status=Column(String(16))
    she_sta=Column(Integer(),default=0)
    ruku_user=Column(Integer(),ForeignKey('users.id'))
    def __repr__(self):
        return self.shebei_name
    @classmethod
    def get_by_name(cls,name):
        item=db_session.query(Shebei).filter(Shebei.shebei_name==name).first()
        return item
    @classmethod
    def get_by_id(cls,id):
        item=db_session.query(Shebei).filter(Shebei.id==id).first()
        return item
    @classmethod
    def get_count(cls):
        return db_session.query(Shebei).count()
class TestResult(Base):
    __tablename__='testresults'
    id=Column(Integer(),primary_key=True)
    porject_id=Column(Integer(),ForeignKey('projects.id'))
    creat_time=Column(DateTime())
    bug_first=Column(Integer())
    ceshirenyuan=Column(String(255))
    is_send=Column(Boolean(),default=True)
    filepath=Column(String(64))
    status=Column(Integer(),default=0)
    user_id=Column(Integer(),ForeignKey('users.id'))
    def __repr__(self):
        return self.porject_name
    @classmethod
    def get_by_name(cls,name):
        item=db_session.query(TestResult).filter(TestResult.porject_name==name).first()
        return item
    @classmethod
    def get_by_id(cls,id):
        item=db_session.query(TestResult).filter(TestResult.id==id).first()
        return item
    @classmethod
    def get_by_user_id(cls,user_id):
        item=db_session.query(TestResult).filter(TestResult.user_id==user_id).first()
        return item
    @classmethod
    def get_count(cls):
        return db_session.query(TestResult).count()
class BanbenWrite(Base):
    __tablename__='banbens'
    id=Column(Integer(),primary_key=True)
    porject_id=Column(Integer(),ForeignKey('projects.id'))
    creat_time=Column(DateTime(),default=datetime.datetime.now())
    banbenhao=Column(String(32))
    is_xian=Column(Boolean(),default=False)
    is_test=Column(Boolean(),default=False)
    status=Column(Integer())
    user_id=Column(Integer(),ForeignKey('users.id'))
    bugadmin=relationship('BugAdmin',backref='banbens')
    def __repr__(self):
        return self.banbenhao
    @classmethod
    def get_by_name(cls,name):
        item=db_session.query(BanbenWrite).filter(BanbenWrite.porject_name==name).first()
        return item
    @classmethod
    def get_by_id(cls,id):
        item=db_session.query(BanbenWrite).filter(BanbenWrite.id==id).first()
        return item
    @classmethod
    def get_by_user_id(cls,user_id):
        item=db_session.query(BanbenWrite).filter(BanbenWrite.user_id==user_id).first()
        return item
    @classmethod
    def get_count(cls):
        return db_session.query(BanbenWrite).count()
class FilePan(Base):
    __tablename__='files'
    id=Column(Integer(),primary_key=True)
    file_fenlei=Column(String(64))
    file_name=Column(String(64))
    down_count=Column(Integer(),default=0)
    creat_time=Column(DateTime(),default=datetime.datetime.now())
    status=Column(Integer(),default=0)
    down_url=Column(String(64))
    is_tui=Column(Boolean(),default=False)
    user_id=Column(Integer(),ForeignKey('users.id'))
    def __repr__(self):
        return self.file_name
    @classmethod
    def get_by_file_name(cls,name):
        item=db_session.query(FilePan).filter(FilePan.file_name==name).first()
        return item
    @classmethod
    def get_by_id(cls,id):
        item=db_session.query(FilePan).filter(FilePan.id==id).first()
        return item
    @classmethod
    def get_by_user_id(cls,user_id):
        item=db_session.query(FilePan).filter(FilePan.user_id==user_id).first()
        return item
    @classmethod
    def get_count(cls):
        return db_session.query(FilePan).count()
class BugAdmin(Base):
    __tablename__='bugadmins'
    id=Column(Integer(),primary_key=True)
    porject_id=Column(Integer(),ForeignKey('projects.id'))
    bugname=Column(String(64))
    bugdengji=Column(String(64))
    bugtime=Column(DateTime(),default=datetime.datetime.now())
    bug_miaoshu=Column(String(255))
    ban_id=Column(Integer(),ForeignKey('banbens.id'))
    fujian=Column(String(64))
    is_que=Column(Boolean())
    bug_status=Column(String(64))
    bug_jiejuefangan=Column(String(64))
    bug_send=Column(String(64))
    status=Column(Integer(),default=0)
    bug_log=relationship('BugLog',backref='bugadmins')
    user_id=Column(Integer(),ForeignKey('users.id'))
    def __repr__(self):
        return self.bugname
    @classmethod
    def get_by_bugname(cls,bugname):
        item=db_session.query(BugAdmin).filter(BugAdmin.bugname==bugname).first()
        return item
    @classmethod
    def get_by_id(cls,id):
        item=db_session.query(BugAdmin).filter(BugAdmin.id==id).first()
        return item
    @classmethod
    def get_by_porject_name(cls,porject_name):
        item=db_session.query(BugAdmin).filter(BugAdmin.porject_name==porject_name).first()
        return item
    @classmethod
    def get_count(cls):
        return db_session.query(BugAdmin).count()
class TestCase(Base):
    __tablename__='testcases'
    id=Column(Integer(),primary_key=True)
    porject_id=Column(Integer(),ForeignKey('projects.id'))
    casename=Column(String(64))
    case_qianzhi=Column(String())
    case_buzhou=Column(String())
    case_yuqi=Column(String())
    status=Column(Integer(),default=0)
    case_crea_time=Column(DateTime(),default=datetime.datetime.now())
    user_id=Column(Integer(),ForeignKey('users.id'))
    def __repr__(self):
        return self.casename
    @classmethod
    def get_by_project_name(Cls,project_name):
        item=db_session.query(TestCase).filter(TestCase.project_name==project_name).first()
        return item
    @classmethod
    def get_by_casename(Cls,casename):
        item=db_session.query(TestCase).filter(TestCase.casename==casename).first()
        return item
    @classmethod
    def get_by_id(cls,id):
        item=db_session.query(TestCase).filter(TestCase.id==id).first()
        return item
    @classmethod
    def get_count(cls):
        return db_session.query(TestCase).count()
class BugLog(Base):
    __tablename__='buglogs'
    id=Column(Integer(),primary_key=True)
    bug_id=Column(Integer(),ForeignKey('bugadmins.id'))
    caozuo=Column(String())
    caozuo_time=Column(DateTime())
    user_id=Column(Integer(),ForeignKey('users.id'))
    def __repr__(self):
        return self.caozuo
    @classmethod
    def get_by_id(Cls,id):
        item=db_session.query(BugLog).filter(BugLog.id==id).first()
        return item
    @classmethod
    def get_by_user_id(Cls,user_id):
        item=db_session.query(BugLog).filter(BugLog.user_id==user_id).first()
        return item
    @classmethod
    def get_by_bug_id(Cls,bug_id):
        item=db_session.query(BugLog).filter(BugLog.bug_id==bug_id).first()
        return item
class Project(Base):
    __tablename__='projects'
    id=Column(Integer(),primary_key=True)
    name=Column(String(64))
    user_id=Column(Integer(),ForeignKey('users.id'))
    bug_log=relationship('BugAdmin',backref='projects')
    banben=relationship('BanbenWrite',backref='projects')
    testresult=relationship('TestResult',backref='projects')
    testcase=relationship('TestCase',backref='projects')
    def __repr__(self):
        return self.name
    @classmethod
    def get_by_id(cls,id):
        item=db_session.query(Project).filter(Project.id==id).first()
        return item
    @classmethod
    def get_by_name(cls,name):
        item=db_session.query(Project).filter(Project.name==name).first()
        return item

这是数据库相关的,

数据库配置相关的
from sqlalchemy import  create_engine
from  sqlalchemy.orm import  scoped_session,sessionmaker
from sqlalchemy.ext.declarative import  declarative_base
engine=create_engine('sqlite:///shebei.db',convert_unicode=True)
Base=declarative_base()
db_session=scoped_session(sessionmaker(bind=engine))
def create_all():
    Base.metadata.create_all(engine)
def drop_all():
    Base.metadata.drop_all(engine)

其实在开发的过程中,也遇到了很多阻力,比如下载一直实现不好,比如分页,也是参照别人的实现的,

分页公共模块

class Pagination:
    def __init__(self, current_page, all_item):
        try:
            page = int(current_page)
        except:
            page = 1
        if page < 1:
            page = 1
        all_pager, c = divmod(all_item, 10)
        if int(c) > 0:
            all_pager += 1
        self.current_page = page
        self.all_pager = all_pager
    @property
    def start(self):
        return (self.current_page - 1) * 10
    @property
    def end(self):
        return self.current_page * 10
    def string_pager(self, base_url="/index/"):
        if self.current_page == 1:
            prev = '<li><a href="javascript:void(0);">上一页</a></li>'
        else:
            prev = '<li><a href="%s%s">上一页</a></li>' % (base_url, self.current_page - 1,)
        if self.current_page == self.all_pager:
            nex = '<li><a href="javascript:void(0);">下一页</a></li>'
        else:
            nex = '<li><a href="%s%s">下一页</a></li>' % (base_url, self.current_page + 1,)
        last = '<li><a href="%s%s">尾页</a></li>' % (base_url, self.all_pager,)
        str_page = "".join((prev,nex,last))
        return str_page

在上传文件的时候,原来存放在本地,结果呢,下载处理不好,于是乎选择了七牛,需要到七牛的官网去注册自己的账号

from qiniu import Auth,put_file,etag,urlsafe_base64_encode
import qiniu.config
access_key='uVxowDUcYx641ivtUb111WBEI4112L3D117JHNM_AOtskRh4'
secret_key='PdXU9XrXTLtp1N21bhU1Frm1FDZqE1qhjkEaE9d1xVLZ5C'
def sendfile(key,file):
    q=Auth(access_key,secret_key)
    bucket_name='leilei22'
    token = q.upload_token(bucket_name, key)
    ret, info = put_file(token, key, file)
    me= ret['hash']
    f=etag(file)
    if me==f:
        assert_t=True
    else:
        assert_t=False
    return assert_t

解析Excel,主要用于上传测试用例

import xlrd,xlwt
from xlutils.copy import copy
def datacel(filepath):
    file=xlrd.open_workbook(filepath)
    me=file.sheets()[0]
    nrows=me.nrows
    porject_id_list=[]
    casename_list=[]
    case_qianzhi_list=[]
    case_buzhou_list=[]
    case_yuqi_list=[]
    for i in range(1,nrows):
        porject_id_list.append(me.cell(i,0).value)
        casename_list.append(me.cell(i,2).value)
        case_qianzhi_list.append(me.cell(i,3).value)
        case_buzhou_list.append(me.cell(i,4).value)
        case_yuqi_list.append(me.cell(i,1).value)
    return porject_id_list,casename_list,case_qianzhi_list,case_buzhou_list,case_yuqi_list

其实这么现在公共模块完毕了,其实现在可以着手去开始写我们的代码了,主要的代码,还有静态界面,因为前后端都是我自己,我的前端其实还是从网上找来的模板,

学习的道路是痛苦的,但是我相信我是可以成功的,

from tornado.web import RequestHandler
from models.model_py import User
class BaseHandler(RequestHandler):
    @property
    def db(self):
        return self.application.db
    def get_current_user(self):
        user_id = self.get_secure_cookie('user_id')
        if not user_id:
            return None
        return User.get_by_id(int(user_id))

基础的类集成了RequestHandler的类,,进行了一些简单的自定义,然后后续可以用这个,

进过两周的开发,已经形成了成熟的,


相关文章
|
23天前
|
机器学习/深度学习 人工智能 算法
猫狗宠物识别系统Python+TensorFlow+人工智能+深度学习+卷积网络算法
宠物识别系统使用Python和TensorFlow搭建卷积神经网络,基于37种常见猫狗数据集训练高精度模型,并保存为h5格式。通过Django框架搭建Web平台,用户上传宠物图片即可识别其名称,提供便捷的宠物识别服务。
232 55
|
2月前
|
机器学习/深度学习 数据采集 供应链
使用Python实现智能食品安全追溯系统的深度学习模型
使用Python实现智能食品安全追溯系统的深度学习模型
76 4
|
11天前
|
存储 缓存 监控
局域网屏幕监控系统中的Python数据结构与算法实现
局域网屏幕监控系统用于实时捕获和监控局域网内多台设备的屏幕内容。本文介绍了一种基于Python双端队列(Deque)实现的滑动窗口数据缓存机制,以处理连续的屏幕帧数据流。通过固定长度的窗口,高效增删数据,确保低延迟显示和存储。该算法适用于数据压缩、异常检测等场景,保证系统在高负载下稳定运行。 本文转载自:https://www.vipshare.com
103 66
|
1天前
|
机器学习/深度学习 人工智能 算法
基于Python深度学习的眼疾识别系统实现~人工智能+卷积网络算法
眼疾识别系统,本系统使用Python作为主要开发语言,基于TensorFlow搭建卷积神经网络算法,并收集了4种常见的眼疾图像数据集(白内障、糖尿病性视网膜病变、青光眼和正常眼睛) 再使用通过搭建的算法模型对数据集进行训练得到一个识别精度较高的模型,然后保存为为本地h5格式文件。最后使用Django框架搭建了一个Web网页平台可视化操作界面,实现用户上传一张眼疾图片识别其名称。
14 4
基于Python深度学习的眼疾识别系统实现~人工智能+卷积网络算法
|
1月前
|
机器学习/深度学习 人工智能 算法
【宠物识别系统】Python+卷积神经网络算法+深度学习+人工智能+TensorFlow+图像识别
宠物识别系统,本系统使用Python作为主要开发语言,基于TensorFlow搭建卷积神经网络算法,并收集了37种常见的猫狗宠物种类数据集【'阿比西尼亚猫(Abyssinian)', '孟加拉猫(Bengal)', '暹罗猫(Birman)', '孟买猫(Bombay)', '英国短毛猫(British Shorthair)', '埃及猫(Egyptian Mau)', '缅因猫(Maine Coon)', '波斯猫(Persian)', '布偶猫(Ragdoll)', '俄罗斯蓝猫(Russian Blue)', '暹罗猫(Siamese)', '斯芬克斯猫(Sphynx)', '美国斗牛犬
167 29
【宠物识别系统】Python+卷积神经网络算法+深度学习+人工智能+TensorFlow+图像识别
|
1天前
|
Python
[oeasy]python057_如何删除print函数_dunder_builtins_系统内建模块
本文介绍了如何删除Python中的`print`函数,并探讨了系统内建模块`__builtins__`的作用。主要内容包括: 1. **回忆上次内容**:上次提到使用下划线避免命名冲突。 2. **双下划线变量**:解释了双下划线(如`__name__`、`__doc__`、`__builtins__`)是系统定义的标识符,具有特殊含义。
14 3
|
4天前
|
安全 前端开发 数据库
Python 语言结合 Flask 框架来实现一个基础的代购商品管理、用户下单等功能的简易系统
这是一个使用 Python 和 Flask 框架实现的简易代购系统示例,涵盖商品管理、用户注册登录、订单创建及查看等功能。通过 SQLAlchemy 进行数据库操作,支持添加商品、展示详情、库存管理等。用户可注册登录并下单,系统会检查库存并记录订单。此代码仅为参考,实际应用需进一步完善,如增强安全性、集成支付接口、优化界面等。
|
13天前
|
存储 算法 Python
文件管理系统中基于 Python 语言的二叉树查找算法探秘
在数字化时代,文件管理系统至关重要。本文探讨了二叉树查找算法在文件管理中的应用,并通过Python代码展示了其实现过程。二叉树是一种非线性数据结构,每个节点最多有两个子节点。通过文件名的字典序构建和查找二叉树,能高效地管理和检索文件。相较于顺序查找,二叉树查找每次比较可排除一半子树,极大提升了查找效率,尤其适用于海量文件管理。Python代码示例包括定义节点类、插入和查找函数,展示了如何快速定位目标文件。二叉树查找算法为文件管理系统的优化提供了有效途径。
45 5
|
1月前
|
机器学习/深度学习 算法 前端开发
基于Python深度学习的果蔬识别系统实现
果蔬识别系统,主要开发语言为Python,基于TensorFlow搭建ResNet卷积神经网络算法模型,通过对12种常见的果蔬('土豆', '圣女果', '大白菜', '大葱', '梨', '胡萝卜', '芒果', '苹果', '西红柿', '韭菜', '香蕉', '黄瓜')图像数据集进行训练,最后得到一个识别精度较高的模型文件。再基于Django框架搭建Web网页端可视化操作界面,以下为项目实现介绍。
41 4
基于Python深度学习的果蔬识别系统实现
|
3月前
|
机器学习/深度学习 传感器 存储
使用 Python 实现智能地震预警系统
使用 Python 实现智能地震预警系统
153 61