日拱一卒无有尽,功不唐捐终入海!
一、背景
在整理公司项目的时候 ,不晓得什么时候接了一个关于疫情的项目,需要可视化展示数据 ,大致了解过项目结构和实现逻辑,先是从公共接口爬取数据,然后清洗数据再写入数据库,最后使用flask后台框架启动访问web,读取数据库的数据,经过前端渲染,如下图所示:
1.1、调整原来的项目结构
新增了conf目录,作为该项目的数据配置项,譬如mysql信息就统一管理,因为原项目在spider和sqldata两处都创建了mysql连接,所以只保留一处即可
def mysql():
"""创建mysql连接,返回数据库操作对象"""
db = pymysql.connect(host=MYSQL_HOST, user=MYSQL_DB_USER, password=MYSQL_DB_PASSWORD, database=MYSQL_DB_NAME, charset='utf8')
cur = db.cursor()
return db, cur
def close(db, cur):
"""关闭数据库连接"""
cur.close()
db.close()
def query(sql: str, *args):
"""执行sql并返回结果"""
db, cur = mysql()
cur.execute(sql, args)
result = cur.fetchall()
close(db, cur)
return result
二、新增/优化功能
原项目中在spider文件中,每次操作都会在控制台输出print日志,这个样子有点不美观
2.1、新增logger模块
创建libs作为整个项目的工具类
class Logger(object):
__instance = None
def __new__(cls, *args, **kwargs):
if not cls.__instance:
cls.__instance = object.__new__(cls, *args)
return cls.__instance
def __init__(self):
self.formater = logging.Formatter(
'[%(asctime)s] [%(levelname)s] [%(module)s:%(lineno)d ], %(message)s')
self.logger = logging.getLogger('log')
self.logger.setLevel(logging.DEBUG)
self.filelogger = handlers.RotatingFileHandler(LOG_PATH,
maxBytes=5242880,
backupCount=3,
encoding='utf-8')
# self.console = logging.StreamHandler()
# self.console.setLevel(logging.DEBUG)
self.filelogger.setFormatter(self.formater)
# self.console.setFormatter(self.formater)
self.logger.addHandler(self.filelogger)
# self.logger.addHandler(self.console)
- 日志输出乱码?
在文件处理对象的时候,需要指定encoding=‘utf-8’,避免乱码
2.2、实时刷新数据
前面读了spider模块,每次执行都会先删除表然后再创建最后写入数据,从数据库配置来看完全没有任何压力,但是启动app时,它并没有刷新数据库,每次都会获取第一次执行spider的数据,所以需要做到实时刷新,也就是spider需要每次都执行。
@app.route('/')
def hello_world():
logger.logger.info("用来刷新数据...")
main() # 这个就是每次操作数据库,删除、创建、写入
return render_template('main.html')
三、总结
就整个项目而言,原先是需要先执行spider然后再启动app,所以没那么智能,在经过调整后,只需要启动app就会连带讲数据库的数据刷新一遍,每次都会是最新数据,尽管是每天的某个固定时间才会更新,总好过每次手动指定spider模块;
- 还有一点优化,就是requirements.txt;在当前环境安装pipreqs,然后在工程根路径下执行pipreqs ./就可以获得项目相关的库依赖,这样就方便项目迁移了。
附录spider模块代码<关注我,获取源码>:
def get_json(url):
headers = {
'user - agent': 'Mozilla / 5.0(WindowsNT10.0;Win64;x64) AppleWebKit / 537.36(KHTML, likeGecko) Chrome / 80.0.3987.116Safari / 537.36'
}
response = requests.get(url=url, headers=headers).text
data = json.loads(response)
return data['data']
# 获取当日数据
def get_now(data):
now = []
data_time = str(data['diseaseh5Shelf']['lastUpdateTime']) # 数据更新时间
data_all = data['diseaseh5Shelf']['areaTree'][0]
data_province_s = data['diseaseh5Shelf']['areaTree'][0]['children']
# 获取全国今日新增、累计确诊、治愈人数、死亡人数
confirms = data_all['total']['confirm']
confirms_add = data_all['today']['confirm']
heals = data_all['total']['heal']
deads = data_all['total']['dead']
# 获取每个省份的每个城市今日新增、累计确诊、治愈人数、死亡人数
for data_province in data_province_s:
province = data_province['name'] # 省份
for data_city in data_province['children']:
city = data_city['name'] # 城市
confirm = data_city['total']['confirm'] # 确诊
confirm_add = data_city['today']['confirm'] # 新增
heal = data_city['total']['heal'] # 治愈
dead = data_city['total']['dead'] # 死亡
now.append((data_time, province, city, confirm_add, confirm, heal, dead))
return confirms, confirms_add, heals, deads, now
# 获取历史数据
def get_past(data):
past = {
}
for data_day in data:
data_time = data_day['date'] # 获取最原始的时间
time_deal = time.strptime(data_time, '%m.%d') # 根据指定的格式把一个时间字符串解析为时间元组
date = time.strftime('%m-%d', time_deal) # 重新组成新的时间字符串
past[date] = {
'confirm': data_day['confirm'], # 确诊
'suspect': data_day['suspect'], # 疑似
'heal': data_day['heal'], # 治愈
'dead': data_day['dead'] # 死亡
}
return past
# 写入当日数据
def insert_now(now):
try:
logger.logger.info("每次这个很有意思,每次刷新都需要先删除数据表,然后再创建表,最后再将清洗的数据写入表中.")
cur.execute("DROP TABLE IF EXISTS 当日数据")
# 写创建表的sql语句
set_sql_now = "create table 当日数据(时间 varchar(100),省份 varchar(50),城市 varchar(50),新增确诊 int(11)," \
"确诊人数 int(11),治愈人数 int(11),死亡人数 int(11))ENGINE=InnoDB DEFAULT CHARSET=utf8"
# 执行sql语句
cur.execute(set_sql_now)
# 保存
db.commit()
# 写入数据库
save_sql_now = "insert into 当日数据 values(%s,%s,%s,%s,%s,%s,%s)"
cur.executemany(save_sql_now, now) # now位置必须是个列表,列表里面的元素是数组
db.commit()
logger.logger.info('当日数据写入成功')
except Exception as e:
logger.logger.error('当日数据写入失败原因:%s' % e)
# 写入历史数据
def insert_past(past):
try:
cur.execute("DROP TABLE IF EXISTS 历史数据")
# 写创建表的sql语句
set_sql_past = "create table 历史数据(时间 varchar(100),确诊人数 int(11),疑似病例 int(11),治愈人数 int(11)," \
"死亡人数 int(11))ENGINE=InnoDB DEFAULT CHARSET=utf8"
# 执行sql语句
cur.execute(set_sql_past)
# 保存
db.commit()
# 写入历史数据
for date, data in past.items():
sql_past = f"insert into 历史数据 values('{date}',{data['confirm']},{data['suspect']},{data['heal']}," \
f"{data['dead']})"
cur.execute(sql_past)
db.commit()
logger.logger.info('历史数据写入成功')
except Exception as e:
logger.logger.error('历史数据写入失败原因:%s' % e)
# 写入历史新增数据
def insert_past_add(past):
try:
cur.execute("DROP TABLE IF EXISTS 历史新增数据")
# 写创建表的sql语句
set_sql_past = "create table 历史新增数据(时间 varchar(100),新增确诊 int(11),新增疑似 int(11),新增治愈 int(11)," \
"新增死亡 int(11))ENGINE=InnoDB DEFAULT CHARSET=utf8"
# 执行sql语句
cur.execute(set_sql_past)
# 保存
db.commit()
# 写入历史数据
for date, data in past.items():
sql_past = f"insert into 历史新增数据 values('{date}',{data['confirm']},{data['suspect']},{data['heal']}," \
f"{data['dead']})"
cur.execute(sql_past)
db.commit()
logger.logger.info('历史新增数据写入成功')
except Exception as e:
logger.logger.error('历史新增数据写入失败原因:%s' % e)
# 写入累计确诊等四个关键数据
def insert_main(confirm, confirm_add, heal, dead):
try:
cur.execute("DROP TABLE IF EXISTS 关键数据")
# 写创建表的sql语句
set_sql_main = "create table 关键数据(确诊人数 int(11),新增确诊 int(11),治愈人数 int(11),死亡人数 int(11))" \
"ENGINE=InnoDB DEFAULT CHARSET=utf8"
cur.execute(set_sql_main)
db.commit()
# 写入确诊人数、新增确诊、治愈人数、死亡人数的数据
save_sql_main = f"insert into 关键数据 values({confirm},{confirm_add},{heal},{dead})"
cur.execute(save_sql_main)
db.commit()
logger.logger.info('关键数据写入成功')
except Exception as e:
logger.logger.error('关键数据写入失败原因:%s' % e)