Python-20:手写获取行政区划

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: Python版爬虫,解析网页中的行政区划的数据,按照 省直辖市/市/区县/乡镇街道/社区

1. 目录

python

spider

2. 背景

很多地方需要用到 统计用区划和城乡划分代码 这块以权威数据为准,但是人家是一个网页。

区划和城乡划分代码

2016 年我用 Java 实现了一版,感觉使用起来不是很方便,后来又在 2020 年 用 Python 又实现了一次,代码量明显少了很大一部分。

3. 设计

3.1. 分析

行政区域的数据结构比较经典,完全是树形数据结构,随着树的深度增大,数据增长在10倍左右。

  • 根节点的数量是 1
  • 省/直辖市 数量为 33
  • 城市 354
  • 区县 3331

3.2. 设计

  • 在实现上,使用了 Pythonpymysql 的类库。
  • 在数据存储上,利用了数据库,在数据库选择上,本着简单易用的原则选择了 MySQL ,数据库表设计如下:

CREATE TABLE areas (  
  code VARCHAR(30) DEFAULT NULL,
  name VARCHAR(100) DEFAULT NULL,
  lv INT(11) DEFAULT NULL,
  sup_code VARCHAR(30) DEFAULT NULL,
  flag VARCHAR(6) DEFAULT NULL,
  url VARCHAR(60) DEFAULT NULL
) ;
  • 因为涉及到对数据库的操作,我这里按照设计思想,对数据库的操作,都封装在一个 DbUtil.py
  • 数据大的过程中,需要考虑分页

4. 环境

  • Pycharm 2021.2.2
  • Python 3.10
  • Anaconda3
插件 说明
pymysql
BeautifulSoup

通过 Python Interpreter 安装 或者 pip 安装都可以,自行视实际情况自行选择 。

20220830172549

5. 实现

AreaMa.py 中,依次执行 init_province()init_city()init_county()init_town()init_village()

  • init_province(): 第一级 省|直辖市
  • init_city(): 第二级 地级市
  • init_county(): 第三级 区县
  • init_town(): 第四级 乡镇街道
  • init_village(): 第四级 自然村/社区

20220903153516

5.1. 核心


import urllib.request

from bs4 import BeautifulSoup
from sip.Area import Area
from util.DbUtil import *

G_HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36'}

G_URL = r'http://www.stats.gov.cn/tjsj/tjbz/tjyqhdmhcxhfdm/2021/index.html'
""" 获取URL前缀 """
G_URL_PREFIX = G_URL[0:G_URL.rindex('/') + 1]

'''
爬取国家统计局网站中 关于行政区划的数据,按照层级进行编辑,最终形成一个树形结构。
包含:省/直辖市/自治区、地级市、区县、乡/镇/街道、居委会/村

'''


def main():
    """ 第一级 省|直辖市 """
    # init_province()
    """ 第二级 地级市 """
    # init_city()
    """ 第三级 区县 """
    # init_county()
    """ 第四级 乡镇街道 """
    init_town()
    """ 第四级 自然村/社区 """
    # init_village()


if __name__ == '__main__':
    main()

""" Init 执行入口,按照 1、2、3层级顺序执行 """


def get_by_lv(lv):
    with UsingMysql(commit=True) as um:
        um.cursor.execute(f'select code,name,lv,sup_code,url from areas where lv=%s', lv)
        data = um.cursor.fetchall()
        return data


""" 由于街道数量量比较大,所以要分页处理 """


def init_village():
    do_init_village(4)


def do_init_village(lv):
    pag = UsingMysql(numPerPage=20)
    sql = r'select code,name,lv,sup_code,url from areas where lv=%s'
    param = lv
    for ret in pag.queryForList(sql, param):
        paramsList = DataToJson(ret)
        get_village(paramsList)


""" 由于街道数量量比较大,所以要分页处理 """


def init_town():
    do_init_town(3)
    # print(get_by_lv(1))


'''
code,name,lv,sup_code,url
'''


def DataToJson(data):
    jsonData = []
    for row in data:
        result = {}
        result['code'] = row[0]
        result['name'] = row[1]
        result['lv'] = row[2]
        result['sup_code'] = row[3]
        result['url'] = row[4]
        jsonData.append(result)
    return jsonData


def do_init_town(lv):
    pag = UsingMysql(numPerPage=20)
    sql = r'select code,name,lv,sup_code,url from areas where lv=%s'
    param = lv
    for ret in pag.queryForList(sql, param):
        paramsList = DataToJson(ret)
        # print(len(paramsList))
        get_town(paramsList)


'''
批量插入数据库
'''


def batch_insert(_list):
    with UsingMysql(commit=True) as um:
        um.cursor.executemany(""" INSERT INTO areas (code,name,lv,sup_code,url) VALUES (%s,%s,%s,%s,%s)""",
                              _list)


def splicing(paramsList):
    _list = list()
    for el in paramsList:
        _list.append((el.code, el.name, el.lv, el.sup_code, el.url))
    batch_insert(_list)


'''
数据字典,维护 层级对应的 DIV标签
'''
G_LV_DICT = {1: 'provincetr', 2: 'citytr', 3: 'countytr', 4: 'towntr', 5: 'villagetr'}

'''
核心方法 通用处理方法,封装处理解析逻辑
'''


def commons(_list, lv):
    all_area = []
    '''
    利用父级来遍历子级
    '''
    for lp in _list:
        _url = lp['url']
        if _url is None or len(_url) == 0:
            continue
        else:
            req = urllib.request.Request(url=get_to_url(_url, lp['code'], lv), headers=G_HEADERS)
            res = urllib.request.urlopen(req)
            html = res.read()
            soup = BeautifulSoup(html, 'html.parser', from_encoding='gb2312')
            all_tr = soup.find_all('tr', attrs={'class': G_LV_DICT[lv]}, limit=30)
            for row in all_tr:
                if lv == 5:
                    """ 
                    社区的元素发生变更,需要特殊处理
                    """
                    a_ary = row.find_all('td')
                    all_area.append(Area(None, a_ary[2].get_text(), a_ary[0].get_text(), lp['code'], lv))
                else:
                    a_ary = row.find_all('a')
                    if len(a_ary):
                        all_area.append(
                            Area(a_ary[0].get('href'), a_ary[1].get_text(), a_ary[0].get_text(), lp['code'], lv))
                    else:
                        aa_ary = row.find_all('td')
                        all_area.append(Area(None, aa_ary[1].get_text(), aa_ary[0].get_text(), lp['code'], lv))
            '''
            每次批量写入
            '''
            splicing(all_area)
            '''
            写完后 置空
            '''
            all_area = []


'''
性化处理URL地址,并根据层级获取对应URL
'''


def get_to_url(url, code, lv):
    urs = {
        2: G_URL_PREFIX + url,
        3: G_URL_PREFIX + url,
        4: G_URL_PREFIX + code[0:2] + '/' + url,
        5: G_URL_PREFIX + code[0:2] + '/' + code[2:4] + '/' + url,
    }
    return urs.get(lv, None)


'''
省/自治区
'''


def init_province():
    all_area = []
    req = urllib.request.Request(url=G_URL, headers=G_HEADERS)
    res = urllib.request.urlopen(req)
    html = res.read()
    soup = BeautifulSoup(html, 'html.parser', from_encoding='gb2312')
    all_tr = soup.find_all('tr', attrs={'class': 'provincetr'}, limit=10)
    for row in all_tr:
        for r_td in row.find_all('a'):
            ars = Area(r_td.get('href'), r_td.get_text(), r_td.get('href')[0:2], '0', 1)
            all_area.append(ars)
    splicing(all_area)


'''
市
'''


def init_city():
    _list = get_by_lv(1)
    commons(_list, 2)


'''
区县
'''


def init_county():
    _list = get_by_lv(2)
    commons(_list, 3)


'''
街道
'''


def get_town(paramsList):
    return commons(paramsList, 4)


'''
居委会
'''


def get_village(paramsList):
    return commons(paramsList, 5)


5.2. 定义实体

名称 说明
name 名称
code 编码
url 地址
sup_code 上级编码
lv 层级

class Area:
    "Area class 行政区域实例"

    def __init__(self, url, name, code, sup_code, lv):
        self.name = name
        self.code = code
        self.url = url
        self.sup_code = sup_code
        self.lv = lv

    def __str__(self) -> str:
        return 'URL:%s\t  NAME:%s\t  Code:%s\t SUPER_CODE:%s\t LV:%s\t' % (
            self.url, self.name, self.code, self.sup_code, self.lv)

5.3. 数据库操作


import pymysql
import math

""" 用pymysql 操作数据库 """


def get_connection():
    host = '127.0.0.1'
    port = 13306
    db = 'area'
    user = 'root'
    password = 'xxx'
    conn = pymysql.connect(host=host, port=port, db=db, user=user, password=password)
    return conn


def initClientEncode(conn):
    '''mysql client encoding=utf8'''
    curs = conn.cursor()
    curs.execute("SET NAMES utf8")
    conn.commit()
    return curs


""" 使用 with 的方式来优化代码 """


class UsingMysql(object):

    def __init__(self, commit=True, log_label=' In total', numPerPage=20):
        """
        :param commit: 是否在最后提交事务(设置为False的时候方便单元测试)
        :param log_label:  自定义log的文字
        """
        self._commit = commit
        self._log_label = log_label
        self.numPerPage = numPerPage

    def queryForList(self, sql, param=None):
        totalPageNum = self.__calTotalPages(sql, param)
        for pageIndex in range(totalPageNum):
            yield self.__queryEachPage(sql, pageIndex, param)

    def __createPaginaionQuerySql(self, sql, currentPageIndex):
        startIndex = self.__calStartIndex(currentPageIndex)
        qSql = r'select * from (%s) total_table limit %s,%s' % (sql, startIndex, self.numPerPage)
        return qSql

    def __queryEachPage(self, sql, currentPageIndex, param=None):
        curs = initClientEncode(get_connection())
        qSql = self.__createPaginaionQuerySql(sql, currentPageIndex)
        if param is None:
            curs.execute(qSql)
        else:
            curs.execute(qSql, param)
        result = curs.fetchall()
        curs.close()
        return result

    def __calTotalRowsNum(self, sql, param=None):
        ''' 计算总行数 '''
        tSql = r'select count(*) from (%s) total_table' % sql
        curs = initClientEncode(get_connection())
        if param is None:
            curs.execute(tSql)
        else:
            curs.execute(tSql, param)
        result = curs.fetchone()
        curs.close()
        totalRowsNum = 0
        if result != None:
            totalRowsNum = int(result[0])
        return totalRowsNum

    def __calTotalPages(self, sql, param):
        ''' 计算总页数 '''
        totalRowsNum = self.__calTotalRowsNum(sql, param)
        totalPages = 0
        tempTotal = totalRowsNum / self.numPerPage
        if (totalRowsNum % self.numPerPage) == 0:
            totalPages = tempTotal
        else:
            totalPages = math.ceil(tempTotal)
        return totalPages

    def __calStartIndex(self, currentPageIndex):
        startIndex = currentPageIndex * self.numPerPage
        return startIndex;

    def __calLastIndex(self, totalRows, totalPages, currentPageIndex):
        '''计算结束时候的索引'''
        lastIndex = 0
        if totalRows < self.numPerPage:
            lastIndex = totalRows
        elif ((totalRows % self.numPerPage == 0)
              or (totalRows % self.numPerPage != 0 and currentPageIndex < totalPages)):
            lastIndex = currentPageIndex * self.numPerPage
        elif (totalRows % self.numPerPage != 0 and currentPageIndex == totalPages):  # 最后一页
            lastIndex = totalRows
        return lastIndex

    def __enter__(self):

        """ 在进入的时候自动获取连接和cursor """
        conn = get_connection()
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        conn.autocommit = False

        self._conn = conn
        self._cursor = cursor
        return self

    def __exit__(self, *exc_info):
        """ 提交事务 """
        if self._commit:
            self._conn.commit()
        """ 在退出的时候自动关闭连接和cursor """
        self._cursor.close()
        self._conn.close()

    @property
    def cursor(self):
        return self._cursor


相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
【鸿蒙软件开发】进度条Progress
【鸿蒙软件开发】进度条Progress
560 0
|
关系型数据库 Linux Apache
|
8月前
|
机器学习/深度学习 人工智能 自然语言处理
《Python+AI如何颠覆传统文书审查模式》
在法律领域,法律文书审查传统上依赖人工,耗时且易出错。Python结合AI技术为这一问题提供了高效解决方案。通过自然语言处理(NLP),计算机可精准分析法律文书,包括分词、句法分析、命名实体识别(NER)和文本分类等步骤。这些技术能快速提取关键信息,理解复杂语义,并结合深度学习模型如Transformer提升准确性。实际应用中,高质量数据与专业标注至关重要,同时借助TensorFlow或PyTorch优化模型训练。AI辅助审查不仅提高效率,还助力律师、法官和企业法务更好地应对挑战,推动司法公正与智能化发展。
304 1
|
10月前
|
关系型数据库 MySQL Java
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
|
7月前
|
安全 Ubuntu Linux
硬盘格式化用什么工具好?这6个工具值得了解一下
本文介绍了硬盘格式化的重要性及注意事项,并推荐了几款主流平台下的实用格式化工具,包括Windows磁盘管理器、文件资源管理器、DiskGenius、Diskpart命令行、Mac Disk Utility以及Linux的GParted,帮助用户安全高效地完成格式化操作。
|
机器学习/深度学习 人工智能 算法
从 OpenAI-o1 看大模型的复杂推理能力
深入解析OpenAI o1模型的复杂推理技术与发展历程
从 OpenAI-o1 看大模型的复杂推理能力
|
前端开发 Python
python之【Tkinter模块】
python之【Tkinter模块】
279 5
|
存储 定位技术 API
uniapp获取地理位置的API是什么?
uniapp获取地理位置的API是什么?
601 1
|
存储 Java Linux
linux安装Zookeeper的详细步骤
linux安装Zookeeper的详细步骤
693 5