# -*- coding: utf-8 -*- # peewee模块操作数据库 # playhouse模块不用单独装,装完peewee就有了 import peewee import random import hashlib from chinesename import chinesename from playhouse.shortcuts import dict_to_model, model_to_dict province_str =("北京市,天津市,上海市,重庆市,河北省,山西省," "辽宁省,吉林省,黑龙江省,江苏省,浙江省,安徽省," "福建省,江西省,山东省,河南省,湖北省,湖南省," "广东省,海南省,四川省,贵州省,云南省,陕西省," "甘肃省,青海省,台湾省,内蒙古自治区,广西壮族自治区," "西藏自治区,宁夏回族自治区,新疆维吾尔自治区," "香港特别行政区,澳门特别行政区") provinces = province_str.split(",") # 指定数据库 db = peewee.SqliteDatabase("mydata.db") class BaseModel(peewee.Model): class Meta: database = db # 定义数据表 class UserA(BaseModel): name = peewee.CharField() age = peewee.IntegerField() province = peewee.CharField(null=True) md5 = peewee.CharField(null=True) class Meta: table_name = "user_a" class UserB(BaseModel): name = peewee.CharField() age = peewee.IntegerField() province = peewee.CharField(null=True) md5 = peewee.CharField(null=True) class Meta: table_name = "user_b" # 创建数据表 db.connect() db.create_tables([UserA, UserB], safe=True) db.close() # 生成数据 def foo1(): # 生成虚拟数据 cn = chinesename.ChineseName() lst = [] for i in range(1000): name = cn.getName() age = random.randrange(1, 100) province = random.choice(provinces) md5 = hashlib.md5(name+str(age)+province).hexdigest() dct = { "name": name, "age": age, "province": province, "md5": md5 } lst.append(dct) # 多条记录同时插入,提高速度 UserA.insert_many(lst).execute() # 查询表中数据数量 usera_count = UserA.select().count() print "写入完毕:", usera_count # 数据迁移 def foo2(): # 随机取出一定数量的数据 rets = UserA.select().order_by(peewee.fn.Random()).limit(1000) print len(rets) lst = [] for ret in rets: dct = model_to_dict(ret) dct.pop("id") # 删除字典中的id字段 # 通过 name 字段判断,避免重复插入 ret = UserB.select().where(UserB.name == dct["name"]).first() if ret == None: lst.append(dct) if len(lst)>0: UserB.insert_many(lst).execute() print "数据拷贝完成", "取出数据:%d"%len(rets), "成功插入%d"%len(lst) # 查询表中数据数量 userb_count = UserB.select().count() print "数据数量:", userb_count # 字典转model对象 dct = { "name": "Tom", "age": 20, "province": "北京市", "md5": "xxxxx" } user = dict_to_model(UserA, dct) print type(user) # <class '__main__.UserA'> # model对象转字典 user = UserA.create(name="Tom", age=20, province="北京市", md5="xxx") dct = model_to_dict(user) print type(dct) # <type 'dict'>