这是 Python Knowledge Weekly(PKW)第一期,之所以做这个东西,主要还是为了激励自己,每周都能有学习输入,同时再把知识点做一个总结输出。希望自己能够坚持下来,点点滴滴,不忘初心。
本周分享知识
一、Django xadmin 的搭建指导
关于 xadmin 的相关知识,网络上已经有很多了,但是每个知识点都是零零散散的,我自己在搭建的过程中也遇到了一些问题,每次都需要重新查找资料,这里做个简单的总结,希望对大家能有帮助。
二、基于 wxPython 的聊天程序
其实这个是在实验楼上看到的课程,跟着做了下来,感觉收获还可以,记录下自己的学习心得,也许以后有的用呢。
Django xadmin 搭建
安装方式
这里有两种安装方式,pip 安装和源码安装,因为本文主要介绍 xadmin 的安装,所以一些 Django 的基础配置操作,就不再令行说明。
当前 pip 安装的 xadmin 还不支持 django 2.0,所以如果我们使用最新的 django 版本,那么就只能使用源码安装了,我这里也主要介绍该种方法。
下载源码
进入到 xadmin 的 GitHub 主页(https://github.com/sshwsfc/xadmin),切换至 django2 分支,然后下载源码到本地。
安装配置
我这使用的是 Python 3.6 + Django 2.1。
首先创建 Django 项目,不多说,例如我创建的 Django 项目名称为 test_xadmin,再创建名称为 app_xadmin 的 app 应用。在该项目的顶级目录下,即与 manage.py 文件同目录下,创建 extra_apps 目录,并将 xadmin 项目源码解压出的 xadmin 目录拷贝至该目录下。
在 Django 项目的 setting.py 文件中添加如下代码:
1import sys 2sys.path.insert(0, os.path.join(BASE_DIR, 'extra_apps')) 3INSTALL_APP = [ 4'app_xadmin', 5'xadmin', 6'crispy_forms', 7]
再在 url.py 中加入如下代码,注册 xadmin 路由
1import xadmin 2urlpatterns = [ 3 url('xadmin/', xadmin.site.urls), 4]
查看 xadmin 的源码可以看到,xadmin 还是自带了一些数据库表的,所以需要先在数据库中生成这些表,执行代码:
1python manage.py makemigrations 2然后执行 3python manage.py migrate
接下来安装一些依赖包,
1pip install future six httplib2
到这里,xadmin 基本就安装完毕。我们打开页面 x.x.x.x:5000,就能够看到一个功能更加强大的 xadmin 页面了。
如果出现添加 user widget 报错的情况,需要将 xadmin/views/dashborad.py 中的 render() 函数添加一个参数 renderer=None 即可。如下所示:
def render(self, name, value, attrs=None, renderer=None):
…
功能完善
配置导入导出功能
xadmin 默认的功能只有导出,并不能支持文件的导入,我们需要下载 django-import-export 依赖包来支持导入导出功能。
1pip install django-import-export
在setting.py中添加如下:
1INSTALLED_APPS = ( 2 ... 3 'import_export', 4)
在 app 应用 app_xadmin 的 models.py 文件中添加代码:
1from django.db import models 2 3class Article(models.Model): 4 title = models.CharField('title', max_length=256) 5 content = models.TextField('content') 6 pub_date = models.DateTimeField('pub_time', auto_now_add=True, editable=True) 7 update_time = models.DateTimeField('up_time', auto_now=True, null=True) 8 9 def __str__(self): 10 return self.title
同时在该目录下创建 resources.py 文件,添加代码:
1from import_export import resources 2from .models import Article 3class ArticleXResource(resources.ModelResource): 4 class Meta: 5 model = Article
再创建 adminx.py 文件,添加代码:
1from xadmin import views 2import xadmin 3from .models import Article 4from import_export import resources 5from .resources import ArticleXResource 6 7@xadmin.sites.register(Article) 8class ArticleAdmin(object): 9 list_display = ['title', 'content'] 10 import_export_args = {'import_resource_class': ArticleXResource, 'export_resource_class': ArticleXResource} 11 list_export = () # 去掉 Django 默认的导出按钮
现在页面展示如下:
运行 django 程序
使用 gunicorn 部署 django,安装 gunicorn
1pip install gunicorn
首先在 url.py 中加入:
1from django.contrib.staticfiles.urls import staticfiles_urlpatterns 2 3... 4 5urlpatterns += staticfiles_urlpatterns()
然后,在 manage.py 的同级目录下,运行
1/usr/local/python/bin/gunicorn test_xadmin.wsgi -b 0.0.0.0:8000
这样,就在本地的 8000 端口启动了服务
如果出现默写 css 样式展示的问题,可以执行下 python manage.py collectstatic 命令,来搜集静态文件到 settings.py 中设置的 STATIC_ROOT 文件夹中。
基于 wxPython 的聊天程序
入门 Hello World
1import wx 2app = wx.App(false) 3frame = wx.Frame(None, title="Hello World") 4frame.Show() #展示 5app.MainLoop() #启动事件循环
如图:
编写 server 端
使用 asynchat 和 asyncore 两个 Python 的异步通信模块
1import asynchat 2import asyncore 3 4PORT = 6666 5 6 7class EndSession(Exception): 8 pass 9 10 11class ChatSession(asynchat.async_chat): 12 def __init__(self, server, sock): 13 asynchat.async_chat.__init__(self, sock) 14 self.server = server 15 self.set_terminator(b'\n') 16 self.data = [] 17 self.name = None 18 self.enter(LoginRoom(server)) 19 20 def enter(self, room): 21 try: 22 cur = self.room 23 except AttributeError: 24 pass 25 else: 26 cur.remove(self) 27 self.room = room 28 room.add(self) 29 30 def collect_incoming_data(self, data): 31 self.data.append(data.decode("utf-8")) 32 33 def found_terminator(self): 34 line = ''.join(self.data) 35 self.data = [] 36 try: 37 self.room.handle(self, line.encode("utf-8")) 38 except EndSession: 39 self.handle_close() 40 41 def handle_close(self): 42 asynchat.async_chat.handle_close(self) 43 self.enter(LoginRoom(self.server)) 44 45 46class ChatServer(asyncore.dispatcher): 47 def __init__(self, port): 48 asyncore.dispatcher.__init__(self) 49 self.create_socket() 50 self.set_reuse_addr() 51 self.bind(('', port)) 52 self.listen(5) 53 self.users = {} 54 self.main_room = ChatRoom(self) 55 56 def handle_accept(self): 57 comm, addr = self.accept() 58 ChatSession(self, comm) 59 60 61class CommandHandler: 62 def unknown(self, session, cmd): 63 session.push(('Unknown command {} \n'.format(cmd))).encode("utf-8") 64 65 def handle(self, session, line): 66 line = line.decode() 67 if not line.strip(): 68 return 69 parts = line.split(' ', 1) 70 cmd = parts[0] 71 try: 72 line = parts[1].strip() 73 except IndexError: 74 line = '' 75 method = getattr(self, 'do_' + cmd, None) 76 try: 77 method(session, line) 78 except TypeError: 79 self.unknown(session, cmd) 80 81 82class Room(CommandHandler): 83 def __init__(self, server): 84 self.server = server 85 self.sessions = [] 86 87 def add(self, session): 88 self.sessions.append(session) 89 90 def remove(self, session): 91 self.sessions.remove(session) 92 93 def broadcast(self, line): 94 for session in self.sessions: 95 session.push(line) 96 97 def do_logout(self, session, line): 98 raise EndSession 99 100 101class LoginRoom(Room): 102 def add(self, session): 103 Room.add(self, session) 104 session.push(b'Connect Success') 105 106 def do_login(self, session, line): 107 name = line.strip() 108 if not name: 109 session.push(b'UserName Empty') 110 elif name in self.server.users: 111 session.push(b'UserName Exist') 112 else: 113 session.name = name 114 session.enter(self.server.main_room) 115 116 117class LogoutRoom(Room): 118 def add(self, session): 119 try: 120 del self.server.users[session.name] 121 except KeyError: 122 pass 123 124 125class ChatRoom(Room): 126 def add(self, session): 127 session.push(b'Login Success') 128 self.broadcast((session.name + 'has entered the room.\n').encode("utf-8")) 129 self.server.users[session.name] = session 130 Room.add(self, session) 131 132 def remove(self, session): 133 Room.remove(self, session) 134 self.broadcast((session.name + 'has left the room.\n').encode("utf-8")) 135 136 def do_say(self, session, line): 137 self.broadcast((session.name + ':' + line + '\n').encode("utf-8")) 138 139 def do_look(self, session, line): 140 session.push(b'Online Users:\n') 141 for other in self.sessions: 142 session.push((other.name + '\n').encode("utf-8")) 143 144 145if __name__ == '__main__': 146 s = ChatServer(PORT) 147 try: 148 print("chat server run at '0.0.0.0:{0}'".format(PORT)) 149 asyncore.loop() 150 except KeyboardInterrupt: 151 print("chat server exit")
编写 client 端
使用 telnet 的方式来登陆,所以需要用到 telnetlib 模块
1import wx 2import telnetlib 3from time import sleep 4import _thread as thread 5 6 7class LoginFrame(wx.Frame): 8 def __init__(self, parent, id, title, size): 9 wx.Frame.__init__(self, parent, id, title) 10 self.SetSize(size) 11 self.Center() 12 self.serverAddressLabel = wx.StaticText(self, label="Server Address", pos=(10, 50), size=(120, 25)) 13 self.userNameLabel = wx.StaticText(self, label="UserName", pos=(40, 100), size=(120, 25)) 14 self.serverAddress = wx.TextCtrl(self, pos=(120, 47), size=(150, 25)) 15 self.userName = wx.TextCtrl(self, pos=(120, 97), size=(150, 25)) 16 self.loginButton = wx.Button(self, label='Login', pos=(80, 145), size=(130, 30)) 17 self.loginButton.Bind(wx.EVT_BUTTON, self.login) 18 self.Show() 19 20 def login(self, event): 21 try: 22 serverAddress = self.serverAddress.GetLineText(0).split(':') 23 con.open(serverAddress[0], port=int(serverAddress[1]), timeout=10) 24 response = con.read_some() 25 if response != b'Connect Success': 26 self.showDialog('Error', 'Connect Fail!', (200, 100)) 27 return 28 con.write(('login ' + str(self.userName.GetLineText(0)) + '\n').encode("utf-8")) 29 response = con.read_some() 30 if response == b'UserName Empty': 31 self.showDialog('Error', 'UserName Empty!', (200, 100)) 32 elif response == b'UserName Exist': 33 self.showDialog('Error', 'UserNmae Exist!', (200, 100)) 34 else: 35 self.Close() 36 ChatFrame(None, 2, title='My Chat Client', size=(500, 400)) 37 except Exception: 38 self.showDialog('Error', 'Connect Fail!', (95, 20)) 39 40 def showDialog(self, title, content, size): 41 dialog = wx.Dialog(self, title=title, size=size) 42 dialog.Center() 43 wx.StaticText(dialog, label=content) 44 dialog.ShowModal() 45 46 47class ChatFrame(wx.Frame): 48 def __init__(self, parent, id, title, size): 49 wx.Frame.__init__(self, parent, id, title) 50 self.SetSize(size) 51 self.Center() 52 self.chatFrame = wx.TextCtrl(self, pos=(5, 5), size=(490, 310), style=wx.TE_MULTILINE | wx.TE_READONLY) 53 self.message = wx.TextCtrl(self, pos=(5, 320), size=(300, 25)) 54 self.sendButton = wx.Button(self, label="Send", pos=(310, 320), size=(58, 25)) 55 self.usersButton = wx.Button(self, label="Users", pos=(373, 320), size=(58, 25)) 56 self.closeButton = wx.Button(self, label="Close", pos=(436, 320), size=(58, 25)) 57 58 self.sendButton.Bind(wx.EVT_BUTTON, self.send) 59 60 self.usersButton.Bind(wx.EVT_BUTTON, self.lookUsers) 61 62 self.closeButton.Bind(wx.EVT_BUTTON, self.close) 63 thread.start_new_thread(self.receive, ()) 64 self.Show() 65 66 def send(self, event): 67 68 message = str(self.message.GetLineText(0)).strip() 69 if message != '': 70 con.write(('say ' + message + '\n').encode("utf-8")) 71 self.message.Clear() 72 73 def lookUsers(self, event): 74 75 con.write(b'look\n') 76 77 def close(self, event): 78 79 con.write(b'logout\n') 80 con.close() 81 self.Close() 82 83 def receive(self): 84 85 while True: 86 sleep(0.6) 87 result = con.read_very_eager() 88 if result != '': 89 self.chatFrame.AppendText(result) 90 91 92if __name__ == '__main__': 93 app = wx.App() 94 con = telnetlib.Telnet() 95 LoginFrame(None, -1, title="Login", size=(320, 250)) 96 app.MainLoop()
这样,我们启动 server 端,开始监听。再打开两个 client 端,进行交流
非常简易的聊天程序,感兴趣的可以继续添加功能来完善。
GitHub 地址: