Python-20:手写获取行政区划

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Python版爬虫,解析网页中的行政区划的数据,按照 省直辖市/市/区县/乡镇街道/社区

1. 目录

python

spider

2. 背景

很多地方需要用到 统计用区划和城乡划分代码 这块以权威数据为准,但是人家是一个网页。

区划和城乡划分代码

2016 年我用 Java 实现了一版,感觉使用起来不是很方便,后来又在 2020 年 用 Python 又实现了一次,代码量明显少了很大一部分。

3. 设计

3.1. 分析

行政区域的数据结构比较经典,完全是树形数据结构,随着树的深度增大,数据增长在10倍左右。

  • 根节点的数量是 1
  • 省/直辖市 数量为 33
  • 城市 354
  • 区县 3331

3.2. 设计

  • 在实现上,使用了 Pythonpymysql 的类库。
  • 在数据存储上,利用了数据库,在数据库选择上,本着简单易用的原则选择了 MySQL ,数据库表设计如下:

CREATE TABLE areas (  
  code VARCHAR(30) DEFAULT NULL,
  name VARCHAR(100) DEFAULT NULL,
  lv INT(11) DEFAULT NULL,
  sup_code VARCHAR(30) DEFAULT NULL,
  flag VARCHAR(6) DEFAULT NULL,
  url VARCHAR(60) DEFAULT NULL
) ;
  • 因为涉及到对数据库的操作,我这里按照设计思想,对数据库的操作,都封装在一个 DbUtil.py
  • 数据大的过程中,需要考虑分页

4. 环境

  • Pycharm 2021.2.2
  • Python 3.10
  • Anaconda3
插件 说明
pymysql
BeautifulSoup

通过 Python Interpreter 安装 或者 pip 安装都可以,自行视实际情况自行选择 。

20220830172549

5. 实现

AreaMa.py 中,依次执行 init_province()init_city()init_county()init_town()init_village()

  • init_province(): 第一级 省|直辖市
  • init_city(): 第二级 地级市
  • init_county(): 第三级 区县
  • init_town(): 第四级 乡镇街道
  • init_village(): 第四级 自然村/社区

20220903153516

5.1. 核心


import urllib.request

from bs4 import BeautifulSoup
from sip.Area import Area
from util.DbUtil import *

G_HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36'}

G_URL = r'http://www.stats.gov.cn/tjsj/tjbz/tjyqhdmhcxhfdm/2021/index.html'
""" 获取URL前缀 """
G_URL_PREFIX = G_URL[0:G_URL.rindex('/') + 1]

'''
爬取国家统计局网站中 关于行政区划的数据,按照层级进行编辑,最终形成一个树形结构。
包含:省/直辖市/自治区、地级市、区县、乡/镇/街道、居委会/村

'''


def main():
    """ 第一级 省|直辖市 """
    # init_province()
    """ 第二级 地级市 """
    # init_city()
    """ 第三级 区县 """
    # init_county()
    """ 第四级 乡镇街道 """
    init_town()
    """ 第四级 自然村/社区 """
    # init_village()


if __name__ == '__main__':
    main()

""" Init 执行入口,按照 1、2、3层级顺序执行 """


def get_by_lv(lv):
    with UsingMysql(commit=True) as um:
        um.cursor.execute(f'select code,name,lv,sup_code,url from areas where lv=%s', lv)
        data = um.cursor.fetchall()
        return data


""" 由于街道数量量比较大,所以要分页处理 """


def init_village():
    do_init_village(4)


def do_init_village(lv):
    pag = UsingMysql(numPerPage=20)
    sql = r'select code,name,lv,sup_code,url from areas where lv=%s'
    param = lv
    for ret in pag.queryForList(sql, param):
        paramsList = DataToJson(ret)
        get_village(paramsList)


""" 由于街道数量量比较大,所以要分页处理 """


def init_town():
    do_init_town(3)
    # print(get_by_lv(1))


'''
code,name,lv,sup_code,url
'''


def DataToJson(data):
    jsonData = []
    for row in data:
        result = {}
        result['code'] = row[0]
        result['name'] = row[1]
        result['lv'] = row[2]
        result['sup_code'] = row[3]
        result['url'] = row[4]
        jsonData.append(result)
    return jsonData


def do_init_town(lv):
    pag = UsingMysql(numPerPage=20)
    sql = r'select code,name,lv,sup_code,url from areas where lv=%s'
    param = lv
    for ret in pag.queryForList(sql, param):
        paramsList = DataToJson(ret)
        # print(len(paramsList))
        get_town(paramsList)


'''
批量插入数据库
'''


def batch_insert(_list):
    with UsingMysql(commit=True) as um:
        um.cursor.executemany(""" INSERT INTO areas (code,name,lv,sup_code,url) VALUES (%s,%s,%s,%s,%s)""",
                              _list)


def splicing(paramsList):
    _list = list()
    for el in paramsList:
        _list.append((el.code, el.name, el.lv, el.sup_code, el.url))
    batch_insert(_list)


'''
数据字典,维护 层级对应的 DIV标签
'''
G_LV_DICT = {1: 'provincetr', 2: 'citytr', 3: 'countytr', 4: 'towntr', 5: 'villagetr'}

'''
核心方法 通用处理方法,封装处理解析逻辑
'''


def commons(_list, lv):
    all_area = []
    '''
    利用父级来遍历子级
    '''
    for lp in _list:
        _url = lp['url']
        if _url is None or len(_url) == 0:
            continue
        else:
            req = urllib.request.Request(url=get_to_url(_url, lp['code'], lv), headers=G_HEADERS)
            res = urllib.request.urlopen(req)
            html = res.read()
            soup = BeautifulSoup(html, 'html.parser', from_encoding='gb2312')
            all_tr = soup.find_all('tr', attrs={'class': G_LV_DICT[lv]}, limit=30)
            for row in all_tr:
                if lv == 5:
                    """ 
                    社区的元素发生变更,需要特殊处理
                    """
                    a_ary = row.find_all('td')
                    all_area.append(Area(None, a_ary[2].get_text(), a_ary[0].get_text(), lp['code'], lv))
                else:
                    a_ary = row.find_all('a')
                    if len(a_ary):
                        all_area.append(
                            Area(a_ary[0].get('href'), a_ary[1].get_text(), a_ary[0].get_text(), lp['code'], lv))
                    else:
                        aa_ary = row.find_all('td')
                        all_area.append(Area(None, aa_ary[1].get_text(), aa_ary[0].get_text(), lp['code'], lv))
            '''
            每次批量写入
            '''
            splicing(all_area)
            '''
            写完后 置空
            '''
            all_area = []


'''
性化处理URL地址,并根据层级获取对应URL
'''


def get_to_url(url, code, lv):
    urs = {
        2: G_URL_PREFIX + url,
        3: G_URL_PREFIX + url,
        4: G_URL_PREFIX + code[0:2] + '/' + url,
        5: G_URL_PREFIX + code[0:2] + '/' + code[2:4] + '/' + url,
    }
    return urs.get(lv, None)


'''
省/自治区
'''


def init_province():
    all_area = []
    req = urllib.request.Request(url=G_URL, headers=G_HEADERS)
    res = urllib.request.urlopen(req)
    html = res.read()
    soup = BeautifulSoup(html, 'html.parser', from_encoding='gb2312')
    all_tr = soup.find_all('tr', attrs={'class': 'provincetr'}, limit=10)
    for row in all_tr:
        for r_td in row.find_all('a'):
            ars = Area(r_td.get('href'), r_td.get_text(), r_td.get('href')[0:2], '0', 1)
            all_area.append(ars)
    splicing(all_area)


'''
市
'''


def init_city():
    _list = get_by_lv(1)
    commons(_list, 2)


'''
区县
'''


def init_county():
    _list = get_by_lv(2)
    commons(_list, 3)


'''
街道
'''


def get_town(paramsList):
    return commons(paramsList, 4)


'''
居委会
'''


def get_village(paramsList):
    return commons(paramsList, 5)


5.2. 定义实体

名称 说明
name 名称
code 编码
url 地址
sup_code 上级编码
lv 层级

class Area:
    "Area class 行政区域实例"

    def __init__(self, url, name, code, sup_code, lv):
        self.name = name
        self.code = code
        self.url = url
        self.sup_code = sup_code
        self.lv = lv

    def __str__(self) -> str:
        return 'URL:%s\t  NAME:%s\t  Code:%s\t SUPER_CODE:%s\t LV:%s\t' % (
            self.url, self.name, self.code, self.sup_code, self.lv)

5.3. 数据库操作


import pymysql
import math

""" 用pymysql 操作数据库 """


def get_connection():
    host = '127.0.0.1'
    port = 13306
    db = 'area'
    user = 'root'
    password = 'xxx'
    conn = pymysql.connect(host=host, port=port, db=db, user=user, password=password)
    return conn


def initClientEncode(conn):
    '''mysql client encoding=utf8'''
    curs = conn.cursor()
    curs.execute("SET NAMES utf8")
    conn.commit()
    return curs


""" 使用 with 的方式来优化代码 """


class UsingMysql(object):

    def __init__(self, commit=True, log_label=' In total', numPerPage=20):
        """
        :param commit: 是否在最后提交事务(设置为False的时候方便单元测试)
        :param log_label:  自定义log的文字
        """
        self._commit = commit
        self._log_label = log_label
        self.numPerPage = numPerPage

    def queryForList(self, sql, param=None):
        totalPageNum = self.__calTotalPages(sql, param)
        for pageIndex in range(totalPageNum):
            yield self.__queryEachPage(sql, pageIndex, param)

    def __createPaginaionQuerySql(self, sql, currentPageIndex):
        startIndex = self.__calStartIndex(currentPageIndex)
        qSql = r'select * from (%s) total_table limit %s,%s' % (sql, startIndex, self.numPerPage)
        return qSql

    def __queryEachPage(self, sql, currentPageIndex, param=None):
        curs = initClientEncode(get_connection())
        qSql = self.__createPaginaionQuerySql(sql, currentPageIndex)
        if param is None:
            curs.execute(qSql)
        else:
            curs.execute(qSql, param)
        result = curs.fetchall()
        curs.close()
        return result

    def __calTotalRowsNum(self, sql, param=None):
        ''' 计算总行数 '''
        tSql = r'select count(*) from (%s) total_table' % sql
        curs = initClientEncode(get_connection())
        if param is None:
            curs.execute(tSql)
        else:
            curs.execute(tSql, param)
        result = curs.fetchone()
        curs.close()
        totalRowsNum = 0
        if result != None:
            totalRowsNum = int(result[0])
        return totalRowsNum

    def __calTotalPages(self, sql, param):
        ''' 计算总页数 '''
        totalRowsNum = self.__calTotalRowsNum(sql, param)
        totalPages = 0
        tempTotal = totalRowsNum / self.numPerPage
        if (totalRowsNum % self.numPerPage) == 0:
            totalPages = tempTotal
        else:
            totalPages = math.ceil(tempTotal)
        return totalPages

    def __calStartIndex(self, currentPageIndex):
        startIndex = currentPageIndex * self.numPerPage
        return startIndex;

    def __calLastIndex(self, totalRows, totalPages, currentPageIndex):
        '''计算结束时候的索引'''
        lastIndex = 0
        if totalRows < self.numPerPage:
            lastIndex = totalRows
        elif ((totalRows % self.numPerPage == 0)
              or (totalRows % self.numPerPage != 0 and currentPageIndex < totalPages)):
            lastIndex = currentPageIndex * self.numPerPage
        elif (totalRows % self.numPerPage != 0 and currentPageIndex == totalPages):  # 最后一页
            lastIndex = totalRows
        return lastIndex

    def __enter__(self):

        """ 在进入的时候自动获取连接和cursor """
        conn = get_connection()
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        conn.autocommit = False

        self._conn = conn
        self._cursor = cursor
        return self

    def __exit__(self, *exc_info):
        """ 提交事务 """
        if self._commit:
            self._conn.commit()
        """ 在退出的时候自动关闭连接和cursor """
        self._cursor.close()
        self._conn.close()

    @property
    def cursor(self):
        return self._cursor


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
6月前
|
Python
Python玫瑰花完整代码
Python玫瑰花完整代码
235 0
520专属——使用Python代码表白究竟能不能成功?
520,谐音:“我爱你”,一直觉得,520真正的意义,不单是用于表达爱,也不是为了收礼物和红包,而是提醒我们,不要忘记爱与被爱。 废话不多说,下面整理了9个效果图和参考代码,感兴趣的朋友可以看看
|
6月前
|
存储 编解码 运维
第二章 Python字符串处理和编码不再发愁
第二章 Python字符串处理和编码不再发愁
|
6月前
|
程序员 iOS开发 MacOS
笨办法学 Python3 第五版(预览)(一)(3)
笨办法学 Python3 第五版(预览)(一)(3)
87 1
|
6月前
|
存储 安全 程序员
笨办法学 Python3 第五版(预览)(一)(4)
笨办法学 Python3 第五版(预览)(一)(4)
60 1
|
6月前
|
Python
笨办法学 Python3 第五版(预览)(二)(1)
笨办法学 Python3 第五版(预览)(二)(1)
55 1
|
6月前
|
数据可视化 定位技术 Python
笨办法学 Python3 第五版(预览)(三)(3)
笨办法学 Python3 第五版(预览)(三)(3)
47 1
|
6月前
|
程序员 iOS开发 MacOS
笨办法学 Python3 第五版(预览)(一)(1)
笨办法学 Python3 第五版(预览)(一)(1)
120 1
|
6月前
|
程序员 Python
笨办法学 Python3 第五版(预览)(三)(1)
笨办法学 Python3 第五版(预览)(三)(1)
48 1
|
6月前
|
存储 程序员 索引
笨办法学 Python3 第五版(预览)(二)(4)
笨办法学 Python3 第五版(预览)(二)(4)
68 1