1. 目录
2. 背景
很多地方需要用到 统计用区划和城乡划分代码
这块以权威数据为准,但是人家是一个网页。
2016
年我用 Java
实现了一版,感觉使用起来不是很方便,后来又在 2020
年 用 Python
又实现了一次,代码量明显少了很大一部分。
3. 设计
3.1. 分析
行政区域的数据结构比较经典,完全是树形数据结构,随着树的深度增大,数据增长在10倍左右。
- 根节点的数量是 1
- 省/直辖市 数量为 33
- 城市 354
- 区县 3331
3.2. 设计
- 在实现上,使用了
Python
的pymysql
的类库。 - 在数据存储上,利用了数据库,在数据库选择上,本着简单易用的原则选择了
MySQL
,数据库表设计如下:
CREATE TABLE areas (
code VARCHAR(30) DEFAULT NULL,
name VARCHAR(100) DEFAULT NULL,
lv INT(11) DEFAULT NULL,
sup_code VARCHAR(30) DEFAULT NULL,
flag VARCHAR(6) DEFAULT NULL,
url VARCHAR(60) DEFAULT NULL
) ;
- 因为涉及到对数据库的操作,我这里按照设计思想,对数据库的操作,都封装在一个
DbUtil.py
- 数据大的过程中,需要考虑分页
4. 环境
- Pycharm 2021.2.2
- Python 3.10
- Anaconda3
插件 | 说明 |
---|---|
pymysql | |
BeautifulSoup |
通过 Python Interpreter
安装 或者 pip
安装都可以,自行视实际情况自行选择 。
5. 实现
在 AreaMa.py
中,依次执行 init_province()
、 init_city()
、 init_county()
、 init_town()
、 init_village()
。
- init_province(): 第一级 省|直辖市
- init_city(): 第二级 地级市
- init_county(): 第三级 区县
- init_town(): 第四级 乡镇街道
- init_village(): 第四级 自然村/社区
5.1. 核心
import urllib.request
from bs4 import BeautifulSoup
from sip.Area import Area
from util.DbUtil import *
G_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36'}
G_URL = r'http://www.stats.gov.cn/tjsj/tjbz/tjyqhdmhcxhfdm/2021/index.html'
""" 获取URL前缀 """
G_URL_PREFIX = G_URL[0:G_URL.rindex('/') + 1]
'''
爬取国家统计局网站中 关于行政区划的数据,按照层级进行编辑,最终形成一个树形结构。
包含:省/直辖市/自治区、地级市、区县、乡/镇/街道、居委会/村
'''
def main():
""" 第一级 省|直辖市 """
# init_province()
""" 第二级 地级市 """
# init_city()
""" 第三级 区县 """
# init_county()
""" 第四级 乡镇街道 """
init_town()
""" 第四级 自然村/社区 """
# init_village()
if __name__ == '__main__':
main()
""" Init 执行入口,按照 1、2、3层级顺序执行 """
def get_by_lv(lv):
with UsingMysql(commit=True) as um:
um.cursor.execute(f'select code,name,lv,sup_code,url from areas where lv=%s', lv)
data = um.cursor.fetchall()
return data
""" 由于街道数量量比较大,所以要分页处理 """
def init_village():
do_init_village(4)
def do_init_village(lv):
pag = UsingMysql(numPerPage=20)
sql = r'select code,name,lv,sup_code,url from areas where lv=%s'
param = lv
for ret in pag.queryForList(sql, param):
paramsList = DataToJson(ret)
get_village(paramsList)
""" 由于街道数量量比较大,所以要分页处理 """
def init_town():
do_init_town(3)
# print(get_by_lv(1))
'''
code,name,lv,sup_code,url
'''
def DataToJson(data):
jsonData = []
for row in data:
result = {}
result['code'] = row[0]
result['name'] = row[1]
result['lv'] = row[2]
result['sup_code'] = row[3]
result['url'] = row[4]
jsonData.append(result)
return jsonData
def do_init_town(lv):
pag = UsingMysql(numPerPage=20)
sql = r'select code,name,lv,sup_code,url from areas where lv=%s'
param = lv
for ret in pag.queryForList(sql, param):
paramsList = DataToJson(ret)
# print(len(paramsList))
get_town(paramsList)
'''
批量插入数据库
'''
def batch_insert(_list):
with UsingMysql(commit=True) as um:
um.cursor.executemany(""" INSERT INTO areas (code,name,lv,sup_code,url) VALUES (%s,%s,%s,%s,%s)""",
_list)
def splicing(paramsList):
_list = list()
for el in paramsList:
_list.append((el.code, el.name, el.lv, el.sup_code, el.url))
batch_insert(_list)
'''
数据字典,维护 层级对应的 DIV标签
'''
G_LV_DICT = {1: 'provincetr', 2: 'citytr', 3: 'countytr', 4: 'towntr', 5: 'villagetr'}
'''
核心方法 通用处理方法,封装处理解析逻辑
'''
def commons(_list, lv):
all_area = []
'''
利用父级来遍历子级
'''
for lp in _list:
_url = lp['url']
if _url is None or len(_url) == 0:
continue
else:
req = urllib.request.Request(url=get_to_url(_url, lp['code'], lv), headers=G_HEADERS)
res = urllib.request.urlopen(req)
html = res.read()
soup = BeautifulSoup(html, 'html.parser', from_encoding='gb2312')
all_tr = soup.find_all('tr', attrs={'class': G_LV_DICT[lv]}, limit=30)
for row in all_tr:
if lv == 5:
"""
社区的元素发生变更,需要特殊处理
"""
a_ary = row.find_all('td')
all_area.append(Area(None, a_ary[2].get_text(), a_ary[0].get_text(), lp['code'], lv))
else:
a_ary = row.find_all('a')
if len(a_ary):
all_area.append(
Area(a_ary[0].get('href'), a_ary[1].get_text(), a_ary[0].get_text(), lp['code'], lv))
else:
aa_ary = row.find_all('td')
all_area.append(Area(None, aa_ary[1].get_text(), aa_ary[0].get_text(), lp['code'], lv))
'''
每次批量写入
'''
splicing(all_area)
'''
写完后 置空
'''
all_area = []
'''
性化处理URL地址,并根据层级获取对应URL
'''
def get_to_url(url, code, lv):
urs = {
2: G_URL_PREFIX + url,
3: G_URL_PREFIX + url,
4: G_URL_PREFIX + code[0:2] + '/' + url,
5: G_URL_PREFIX + code[0:2] + '/' + code[2:4] + '/' + url,
}
return urs.get(lv, None)
'''
省/自治区
'''
def init_province():
all_area = []
req = urllib.request.Request(url=G_URL, headers=G_HEADERS)
res = urllib.request.urlopen(req)
html = res.read()
soup = BeautifulSoup(html, 'html.parser', from_encoding='gb2312')
all_tr = soup.find_all('tr', attrs={'class': 'provincetr'}, limit=10)
for row in all_tr:
for r_td in row.find_all('a'):
ars = Area(r_td.get('href'), r_td.get_text(), r_td.get('href')[0:2], '0', 1)
all_area.append(ars)
splicing(all_area)
'''
市
'''
def init_city():
_list = get_by_lv(1)
commons(_list, 2)
'''
区县
'''
def init_county():
_list = get_by_lv(2)
commons(_list, 3)
'''
街道
'''
def get_town(paramsList):
return commons(paramsList, 4)
'''
居委会
'''
def get_village(paramsList):
return commons(paramsList, 5)
5.2. 定义实体
名称 | 说明 |
---|---|
name | 名称 |
code | 编码 |
url | 地址 |
sup_code | 上级编码 |
lv | 层级 |
class Area:
"Area class 行政区域实例"
def __init__(self, url, name, code, sup_code, lv):
self.name = name
self.code = code
self.url = url
self.sup_code = sup_code
self.lv = lv
def __str__(self) -> str:
return 'URL:%s\t NAME:%s\t Code:%s\t SUPER_CODE:%s\t LV:%s\t' % (
self.url, self.name, self.code, self.sup_code, self.lv)
5.3. 数据库操作
import pymysql
import math
""" 用pymysql 操作数据库 """
def get_connection():
host = '127.0.0.1'
port = 13306
db = 'area'
user = 'root'
password = 'xxx'
conn = pymysql.connect(host=host, port=port, db=db, user=user, password=password)
return conn
def initClientEncode(conn):
'''mysql client encoding=utf8'''
curs = conn.cursor()
curs.execute("SET NAMES utf8")
conn.commit()
return curs
""" 使用 with 的方式来优化代码 """
class UsingMysql(object):
def __init__(self, commit=True, log_label=' In total', numPerPage=20):
"""
:param commit: 是否在最后提交事务(设置为False的时候方便单元测试)
:param log_label: 自定义log的文字
"""
self._commit = commit
self._log_label = log_label
self.numPerPage = numPerPage
def queryForList(self, sql, param=None):
totalPageNum = self.__calTotalPages(sql, param)
for pageIndex in range(totalPageNum):
yield self.__queryEachPage(sql, pageIndex, param)
def __createPaginaionQuerySql(self, sql, currentPageIndex):
startIndex = self.__calStartIndex(currentPageIndex)
qSql = r'select * from (%s) total_table limit %s,%s' % (sql, startIndex, self.numPerPage)
return qSql
def __queryEachPage(self, sql, currentPageIndex, param=None):
curs = initClientEncode(get_connection())
qSql = self.__createPaginaionQuerySql(sql, currentPageIndex)
if param is None:
curs.execute(qSql)
else:
curs.execute(qSql, param)
result = curs.fetchall()
curs.close()
return result
def __calTotalRowsNum(self, sql, param=None):
''' 计算总行数 '''
tSql = r'select count(*) from (%s) total_table' % sql
curs = initClientEncode(get_connection())
if param is None:
curs.execute(tSql)
else:
curs.execute(tSql, param)
result = curs.fetchone()
curs.close()
totalRowsNum = 0
if result != None:
totalRowsNum = int(result[0])
return totalRowsNum
def __calTotalPages(self, sql, param):
''' 计算总页数 '''
totalRowsNum = self.__calTotalRowsNum(sql, param)
totalPages = 0
tempTotal = totalRowsNum / self.numPerPage
if (totalRowsNum % self.numPerPage) == 0:
totalPages = tempTotal
else:
totalPages = math.ceil(tempTotal)
return totalPages
def __calStartIndex(self, currentPageIndex):
startIndex = currentPageIndex * self.numPerPage
return startIndex;
def __calLastIndex(self, totalRows, totalPages, currentPageIndex):
'''计算结束时候的索引'''
lastIndex = 0
if totalRows < self.numPerPage:
lastIndex = totalRows
elif ((totalRows % self.numPerPage == 0)
or (totalRows % self.numPerPage != 0 and currentPageIndex < totalPages)):
lastIndex = currentPageIndex * self.numPerPage
elif (totalRows % self.numPerPage != 0 and currentPageIndex == totalPages): # 最后一页
lastIndex = totalRows
return lastIndex
def __enter__(self):
""" 在进入的时候自动获取连接和cursor """
conn = get_connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
conn.autocommit = False
self._conn = conn
self._cursor = cursor
return self
def __exit__(self, *exc_info):
""" 提交事务 """
if self._commit:
self._conn.commit()
""" 在退出的时候自动关闭连接和cursor """
self._cursor.close()
self._conn.close()
@property
def cursor(self):
return self._cursor