python实战

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 这篇文章提供了一个Python编写的网络爬虫程序,用于爬取Yahoo知惠袋网站的问答数据,并将其存储为JSON和CSV格式,程序使用了requests、parsel、fake_useragent等库来发送请求、解析HTML和模拟用户代理。
import requests
import re
import os
from parsel import Selector
from fake_useragent import UserAgent
import csv
import time, random
import json

class WisdomBagSearch(object):

    def __init__(self):

        self.MainUrl = "https://chiebukuro.yahoo.co.jp"
        self.MaxNumPage = 2

    def writeHtml(self, url, html):
        with open(url, 'w', encoding='utf-8') as f:
            f.write(str(html))

    def GetHtml(self, Url):
        header = {
            "user-agent": UserAgent().random
        }
        try:
            response = requests.get(Url, headers=header)
            return response.text
        except Exception as e:
            print(e)
            return None

    def GetCategory(self, firstLayerUrl):
        '''
        :param firstLayerUrl: 第一层地址
        :return: 返回所有类别的href值
        '''
        # 1,获取主页面的HTML
        totleHtml = self.GetHtml(firstLayerUrl)
        # 2, 获取模式并根据模式对HTML进行分析
        totleSelector = Selector(text=totleHtml)
        totleTiems = totleSelector.css(
            '.ClapLv2CategoryList_Chie-CategoryList__Category2Wrapper__llQoL a::attr(href)').getall()
        # print(totleTiems)
        # print(CategoryHrefdata) 返回结果中途打印测试
        # writeHtml("./test.txt", response.text) #写出爬取网页源码,测试是否符合预期

        # 3,返回分析结果
        return totleTiems

    def preHanle_categoryHrefS(self, categoryHrefS):
        '''
        :param categoryHrefS:  所有类别的href
        :return: 所有类别可以直接访问的的地址
        '''
        categoryUrls = []
        for categoryHref in categoryHrefS:
            categoryUrl = self.MainUrl + categoryHref + "?flg=1"  # ?flg=1表示进去之后,选择已解决的页面,也就是Secondlayer
            categoryUrls.append(categoryUrl)
        return categoryUrls

    def GetQuestionsUrls(self, categoryUrl):
        '''
        :param categoryUrl: 类别的地址,SecondLayer
        :return: MaxNumPage页内的所有问题的Url
        '''
        # 1,对类别遍历一百次,每一个类别取得MaxNumPage*40个已解决的Url

        questionUrls = []

        for i in range(1, self.MaxNumPage + 1):
            try:
                # 2,获取翻页后类别的网页源码
                NewcategoryUrl = categoryUrl + "&page=" + str(i)
                NewcategoryHtml = self.GetHtml(NewcategoryUrl)

                # 3,对源码分析并提取出当前页面所有回答
                categorySelector = Selector(text=NewcategoryHtml)
                categoryItems = categorySelector.css('.ClapLv3List_Chie-List__ListItem__y_P8W a::attr(href)').getall()
                # print(categoryItems)

                # 4,将分析对结果添加进questionUrls
                questionUrls.append(categoryItems)
            except Exception as e:
                print(e)

            # print(len([i for j in questionUrls for i in j])) #测试是否如预期添加成功
        return [i for j in questionUrls for i in j]  # 返回该类别内40000个问题的地址

    def AnswersNormalization(self, anotherAnswerItems):
        '''
        :param anotherAnswerItems: 其他回答的文本
        :return: 格式化,删除换行,超文本链接
        '''
        try:
            if len(anotherAnswerItems) > 0:
                anotherAnswerItems = list(anotherAnswerItems)
                for i in range(len(anotherAnswerItems)):
                    anotherAnswerItems[i] = re.sub("<.*?>", '', anotherAnswerItems[i])
                    anotherAnswerItems[i] = anotherAnswerItems[i].replace('\n', '')
                    anotherAnswerItems[i] = anotherAnswerItems[i].replace('\r', '')

                return anotherAnswerItems
        except Exception as e:
            print(type(e), e)
            return None

    def GetUserInfo(self, userInfoUrl):
        '''
        :param userInfoUrl: yahoo用户地址
        :return: 用户登陆号,名字
        '''
        if type(userInfoUrl) != type('2') or len(userInfoUrl) < 35 or str(
                userInfoUrl[:35]) != 'https://chiebukuro.yahoo.co.jp/user':  # 非公开ID无法访问该用户
            AnswerItems = ['None', 'D非公開さん']
            return AnswerItems

        userHtml = self.GetHtml(userInfoUrl)
        AnswerSelector = Selector(text=userHtml)
        AnswerItems = AnswerSelector.css('.ClapLv2MyProfile_Chie-MyProfile__ContentItem__DfPaV *::text').getall()
        return [AnswerItems[1], AnswerItems[2]]

    def GetUserInfo2(self, answerHtml):
        answerUserInfoSelector = Selector(text=answerHtml)
        answerUserUrl = answerUserInfoSelector.css(
            '.ClapLv2AnswerItem_Chie-AnswerItem__ItemHead__Mvlc0 a::attr(href)').get()
        loginnumber = os.path.basename(answerUserUrl)
        userName = answerUserInfoSelector.css('.ClapLv1UserInfo_Chie-UserInfo__UserName__1bJYU *::text').get()
        print(loginnumber)
        print(userName)
        return [loginnumber, userName]

    def CheckQuestionStandard(self, questionInfomation):

        return True

    def GetAnswerInfo(self, answerHtml):
        answer = {}
        answerTextSelector = Selector(text=answerHtml)
        answerText = answerTextSelector.css('.ClapLv1TextBlock_Chie-TextBlock__3X4V5 h2::text').getall()
        #  print(answerHtml)
        NewanswerText = ""
        for text in answerText:
            NewanswerText += text
        NewanswerText = NewanswerText.replace('\n', '')
        NewanswerText = NewanswerText.replace('\r', '')

        answerTimeSelector = Selector(text=answerHtml)
        answerTime = answerTimeSelector.css('.ClapLv1UserInfo_Chie-UserInfo__Date__2F1LF *::text').get()

        answerUserInfoSelector = Selector(text=answerHtml)
        answerUserUrl = answerUserInfoSelector.css(
            '.ClapLv2AnswerItem_Chie-AnswerItem__ItemHead__Mvlc0 a::attr(href)').get()

        # self.GetUserInfo2(answerHtml)
        # test

        answerUserInfo = self.GetUserInfo2(answerUserUrl)

        answerApprovelSelector = Selector(text=answerHtml)
        answerApprove = answerApprovelSelector.css(
            '.ClapLv1ReactionCounter_Chie-ReactionCounter__Text__1yosc *::text').get()

        answerReplaySelector = Selector(text=answerHtml)
        answerReplayItems = answerReplaySelector.css(".ClapLv3ReplyList_Chie-ReplyList__Item__33upu").getall()

        replys = []  # 答案的回复模块
        # print(len(answerReplayItems))
        self.GetUserInfo2(answerHtml)
        if len(answerReplayItems) != 0:

            for answerReplay in answerReplayItems:
                reply = {}
                self.GetUserInfo2(answerReplay)
                answerReplyTextSelector = Selector(text=answerReplay)
                answerReplyText = answerReplyTextSelector.css(
                    '.ClapLv1TextBlock_Chie-TextBlock__3X4V5 *::text').getall()
                NewanswerReplyText = ""
                for text in answerReplyText:
                    NewanswerReplyText += text
                NewanswerReplyText = NewanswerReplyText.replace('\n', '')
                NewanswerReplyText = NewanswerReplyText.replace('\r', '')

                answerReplyTimeSelector = Selector(text=answerHtml)
                answerReplyTime = answerReplyTimeSelector.css(
                    '.ClapLv1UserInfo_Chie-UserInfo__DateSmall__3erUK *::text').get()

                answerReplyUserInfoSelector = Selector(text=answerHtml)
                answerUserReplyUrl = answerReplyUserInfoSelector.css(
                    '.ClapLv2ReplyItem_Chie-ReplyItem__ItemHead__HrG5K a::attr(href)').get()
                answerUserReplyInfo = self.GetUserInfo2(answerUserReplyUrl)
                answerUserReplyInfoExp = {'Loginnumber': answerUserReplyInfo[0],
                                          'UserName': answerUserReplyInfo[1]}  # 对用户信息进行解析
                reply['ReplyText'] = NewanswerReplyText
                reply['ReplyTime'] = answerReplyTime
                reply['ReplyUserInfo'] = answerUserReplyInfoExp
                replys.append(reply)

        answer['AnswerText'] = NewanswerText
        answer['AnswerTime'] = answerTime
        answer['AnswerUserInfo'] = {'Loginnumber': answerUserInfo[0],
                                    'UserName': answerUserInfo[1]}

        answer['AnswerApprove'] = answerApprove
        answer['AnswerReply'] = replys
        # print(answer)
        if answerApprove == None:
            answer['AnswerApprove'] = 0
        return answer

    def GetAnswersInfo(self, answerUrl):
        '''
        :param answerUrl: 问题的地址
        :return: 所有回答的内容,时间,用户账户信息,点赞数
        '''

        answers = []
        answerHtml = self.GetHtml(answerUrl)
        answerTextSelector = Selector(text=answerHtml)
        answerHtml = answerTextSelector.css('#ba .ClapLv3BestAnswer_Chie-BestAnswer__1kJ7F').get()
        # print(answerHtml)
        baAnswerInfo = self.GetAnswerInfo(answerHtml)
        try:
            baAnswerInfo['AnswerText'] = baAnswerInfo['AnswerText'][7:]

        except Exception as e:
            print(e)
        answers.append(baAnswerInfo)
        # print(baAnswerInfo)
        pageReply = (int(self.GetOtherAnswerNumber(answerUrl)) - 1) // 5 + 1  # 多页访问回复

        for i in range(1, pageReply + 1):
            NewPageReplyUrl = answerUrl + '?sort=1&page=' + str(i)
            questionHtml = self.GetHtml(NewPageReplyUrl)
            AnswerSelector = Selector(text=questionHtml)
            AnswerItems = AnswerSelector.css('#ans .ClapLv3AnswerList_Chie-AnswerList__Item__2PxD4').getall()
            # print(AnswerItems)
            for answerHtml in AnswerItems:  # 切割出当前页全部回答
                answerInfo = self.GetAnswerInfo(answerHtml)
                answers.append(answerInfo)
        return answers

    def GetQuestionMessage(self, questionUrl):
        questionInformation = []  # 将解析出来的信息,按用户名,时间,回答依次存储
        try:
            # 1
            questionHtml = self.GetHtml(questionUrl)

            self.writeHtml('./test12.txt', questionHtml)
            # 2
            authorNameSelector = Selector(text=questionHtml)
            authorUrl = authorNameSelector.css(
                '#que .ClapLv2QuestionItem_Chie-QuestionItem__Head__1ZglB a::attr(href)').get()
            authorInfo = self.GetUserInfo(authorUrl)
            authorInfo = {'Loginnumber': authorInfo[0], 'UserName': authorInfo[1]}

            authorTimeSelector = Selector(text=questionHtml)
            authorTimeItems = authorTimeSelector.re('itemprop="dateCreated".*?>(.*?)</')

            authorQuestionSelector = Selector(text=questionHtml)
            authorQuestionItems = authorQuestionSelector.css(
                '#que .ClapLv1TextBlock_Chie-TextBlock__3X4V5 *::text').getall()
            queText = authorQuestionItems[0]
            queGlan = authorQuestionItems[-2]  # 问题浏览量

            authorQuestionlabelSelector = Selector(text=questionHtml)  # 问题标签
            authorQuestionLabelItems = authorQuestionlabelSelector.css(
                '#que .ClapLv2QuestionItem_Chie-QuestionItem__SubAnchor__2Pv8w *::text').getall()

            authorThankSelector = Selector(text=questionHtml)  # 问题标签
            authorThankItems = authorThankSelector.css(
                '.ClapLv3BestAnswer_Chie-BestAnswer__Thanks__1ASeS *::text').getall()

            awardMoneyQuestionSelector = Selector(text=questionHtml)
            awardMoneyQuestionItems = awardMoneyQuestionSelector.css(
                '#que .ClapLv2QuestionItem_Chie-QuestionItem__SubChieCoin__2akxj *::text').getall()
            awardMoney = 0

            if len(awardMoneyQuestionItems) == 2:
                awardMoney = awardMoneyQuestionItems[1]

            baAnswerSelector = Selector(text=questionHtml)
            baAnswerSelectorItems = baAnswerSelector.css('#ba .ClapLv1TextBlock_Chie-TextBlock__3X4V5 *::text').getall()

            texts = []
            texts.append(queText)

            if 0 < len(baAnswerSelectorItems) <= 2:  # 特判最佳回答是否存在,以及特判是否有提问者感谢内容
                bestAnswer = baAnswerSelectorItems[1]
                texts.append(bestAnswer)
            else:
                bestAnswer = baAnswerSelectorItems[1]
                thanksAnswer = baAnswerSelectorItems[-3]
                thanksAnswerTime = baAnswerSelectorItems[-1]
                authorTimeItems.append(thanksAnswerTime)
                texts.append(bestAnswer)
                texts.append(thanksAnswer)

            anotherAnswerSelector = Selector(text=questionHtml)
            anotherAnswerItems = anotherAnswerSelector.css('#ans .ClapLv1TextBlock_Chie-TextBlock__3X4V5').getall()

            for anotherAnswer in anotherAnswerItems:
                texts.append(anotherAnswer)

            questionInformation.append(texts[0])  # 问题内容
            questionInformation.append(authorQuestionLabelItems)  # 问题标签
            questionInformation.append(authorTimeItems[0])  # 提问时间
            questionInformation.append(authorInfo)  # 提问者账户信息
            questionInformation.append(questionUrl)  # 问题URL
            questionInformation.append(awardMoney)  # 悬赏数

            answers = self.GetAnswersInfo(questionUrl)  # 回答

            if len(authorThankItems) > 4:  # 特判用户答谢部分是否存在
                ThansInfo = {}
                ThansInfo['AnswerText'] = authorThankItems[2].replace('\n', '')
                ThansInfo['AnswerTime'] = authorThankItems[4]
                ThansInfo['AnswerUserInfo'] = authorInfo
                ThansInfo['AnswerApprove'] = 0
                answers.append(ThansInfo)

            questionInformation.append(answers)  # 加入用户回答

            return questionInformation
        except Exception as e:
            # print(e,'bbbb')
            return []

    def WriteInCsv(self, csvSaveUrl, questionInformation):
        dataList = []
        for question, user, time, text in zip(*questionInformation):
            if user == '' or time == '' or text == '':  # 丢弃脏数据
                continue
            simpleInfomation = {}
            simpleInfomation['Question'] = question
            simpleInfomation['Time'] = time
            simpleInfomation['UserName'] = user
            simpleInfomation['Text'] = text
            dataList.append(simpleInfomation)

        fieldnames = ['Question', 'Time', 'UserName', 'Text']
        with open(csvSaveUrl, 'a+', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writerows(dataList)

    def WriteJson(self, jsonUrl, questionInformation):
        fieldnames = ['QuestionText', 'QuestionLabel', 'QuestionTime', 'QuestionerInformation', 'QuestionUrl',
                      'OfferReward', 'Answers']
        messageItem = {}
        try:
            if len(fieldnames) == len(questionInformation):
                for i in range(len(fieldnames)):
                    messageItem[fieldnames[i]] = questionInformation[i]
                json.dump(messageItem, open(jsonUrl, 'a+', encoding='utf-8'), indent=2, ensure_ascii=False)
        except:
            pass

    def GetCateName(self, categoryUrl):
        categoryHtml = self.GetHtml(categoryUrl)
        cateNameSelector = Selector(text=categoryHtml)
        cateName = cateNameSelector.css('.ClapLv2Title_Chie-Title__TextWrapper__1ccaf h1::text').get()
        return cateName

    def GetOtherAnswerNumber(self, answerUrl):
        '''
        :param answerUrl: 问题网页的资源定位符
        :return:该问题下的其他回复数
        '''
        answerHtml = self.GetHtml(answerUrl)
        answerAnswerSelector = Selector(text=answerHtml)
        answerNumber = answerAnswerSelector.css(
            '.ClapLv2QuestionItem_Chie-QuestionItem__AnswerNumber__3_0RR *::text').get()
        if answerNumber == None:  # 特判无法找到的情况
            answerNumber = 0
        return answerNumber

def main():
    WBSearch = WisdomBagSearch()
    workPath = './Plato44'
    if not os.path.exists(workPath):
        os.mkdir(workPath)
    firstLayerUrl = WBSearch.MainUrl + "/category"
    categoryHrefS = WBSearch.GetCategory(firstLayerUrl)  # 获取主页面下全部类别的href
    secondLayerUrls = WBSearch.preHanle_categoryHrefS(categoryHrefS)  # 对全部类别进行预处理成可以直接跳转的URL
    print("--启动YAHOO知惠网网络爬虫--")

    for secondLayerUrl in secondLayerUrls:
        print(secondLayerUrl)
        questionUrls = WBSearch.GetQuestionsUrls(secondLayerUrl)  # 记录一个类别其中前100页的所有能访问的问题URL

        categoryName = WBSearch.GetCateName(secondLayerUrl)
        print('正在爬取类别:' + categoryName)

        with open(os.path.join(workPath, categoryName + ".json"), 'w', encoding='utf-8') as w:
            s = 1  # 创建json文件

        for questionUrl in questionUrls:
            print(questionUrl)
            try:
                time.sleep(random.random())
                questionInformation = WBSearch.GetQuestionMessage(
                    questionUrl)  # 对每个问题下的html进行解析,提取出提问者名字,时间,内容。回答者们的名字,时间,内容
                # WriteInCsv(secondLayerAddess,questionInformation)
                print(questionInformation)
                if questionInformation == 0:
                    continue

            except Exception as e:
                print(type(e), e)

            jsonUrl = os.path.join(workPath, categoryName + ".json")
            if questionInformation != 0:
                if WBSearch.CheckQuestionStandard(questionInformation):
                    WBSearch.WriteJson(jsonUrl, questionInformation)

def Search(Url):
    WBSearch = WisdomBagSearch()
    workPath = './Plato43'
    secondLayerUrl = Url  # 记录一个类别其中前100页的所有能访问的问题URL
    categoryName = WBSearch.GetCateName(secondLayerUrl)
    print(categoryName)
    if not os.path.exists(workPath):
        os.mkdir(workPath)
    print(secondLayerUrl)
    questionUrls = WBSearch.GetQuestionsUrls(secondLayerUrl)  # 记录一个类别其中前100页的所有能访问的问题URL

    categoryName = WBSearch.GetCateName(secondLayerUrl)
    print('正在爬取类别:' + categoryName)

    with open(os.path.join(workPath, categoryName + ".json"), 'w', encoding='utf-8') as w:
        s = 1  # 创建json文件

    for questionUrl in questionUrls:
        print(questionUrl)
        # test
        try:
            time.sleep(random.random())
            questionInformation = WBSearch.GetQuestionMessage(questionUrl)  # 对每个问题下的html进行解析,提取出提问者名字,时间,内容。回答者们的名字,时间,内容
            # WriteInCsv(secondLayerAddess,questionInformation)
            print(questionInformation)
            if questionInformation == 0:
                continue

        except Exception as e:
            print(type(e), e)

        jsonUrl = os.path.join(workPath, categoryName + ".json")
        if questionInformation != 0:
            if WBSearch.CheckQuestionStandard(questionInformation):
                WBSearch.WriteJson(jsonUrl, questionInformation)

if __name__ == "__main__":
    main()
    Url = "https://chiebukuro.yahoo.co.jp/category/2078675272/question/list?flg=1"
    #Search(Url)
相关文章
|
11天前
|
Python
Python的编辑工具-Jupyter notebook实战案例
这篇博客介绍了Jupyter Notebook的安装和使用方法,包括如何在本地安装Jupyter、启动和使用Jupyter Notebook进行编程、文档编写和数据分析,以及如何执行和管理代码单元(Cell)的快捷键操作。
24 4
Python的编辑工具-Jupyter notebook实战案例
|
11天前
|
Python
Python软件包及环境管理器conda实战篇
详细介绍了如何使用conda进行Python软件包管理及环境管理,包括查看、安装、卸载软件包,切换源,管理不同版本的Python环境,以及解决使用过程中可能遇到的错误。
38 2
Python软件包及环境管理器conda实战篇
|
3天前
|
存储 人工智能 数据挖掘
Python编程入门:从基础到实战
【9月更文挑战第10天】本文将引导你进入Python编程的世界,从基本语法到实际项目应用,逐步深入。我们将通过简单的例子和代码片段,帮助你理解并掌握Python编程的精髓。无论你是编程新手还是有一定经验的开发者,都能在这篇文章中找到有价值的信息。让我们一起开始Python编程之旅吧!
|
4天前
|
数据采集 开发者 Python
探索Python中的异步编程:从基础到实战
【9月更文挑战第9天】本文将带你进入Python异步编程的世界,从理解其核心概念开始,逐步深入到实际应用。我们将一起构建一个小型的异步Web爬虫,通过实践学习如何在不阻塞主线程的情况下并发处理任务,优化程序性能。文章不仅包含理论知识,还提供代码示例,让读者能够动手实践,深刻理解异步编程的力量。
26 12
|
1天前
|
数据采集 网络协议 API
HTTP协议大揭秘!Python requests库实战,让网络请求变得简单高效
【9月更文挑战第13天】在数字化时代,互联网成为信息传输的核心平台,HTTP协议作为基石,定义了客户端与服务器间的数据传输规则。直接处理HTTP请求复杂繁琐,但Python的`requests`库提供了一个简洁强大的接口,简化了这一过程。HTTP协议采用请求与响应模式,无状态且结构化设计,使其能灵活处理各种数据交换。
25 8
|
2天前
|
存储 安全 数据安全/隐私保护
安全升级!Python AES加密实战,为你的代码加上一层神秘保护罩
【9月更文挑战第12天】在软件开发中,数据安全至关重要。本文将深入探讨如何使用Python中的AES加密技术保护代码免受非法访问和篡改。AES(高级加密标准)因其高效性和灵活性,已成为全球最广泛使用的对称加密算法之一。通过实战演练,我们将展示如何利用pycryptodome库实现AES加密,包括生成密钥、初始化向量(IV)、加密和解密文本数据等步骤。此外,还将介绍密钥管理和IV随机性等安全注意事项。通过本文的学习,你将掌握使用AES加密保护敏感数据的方法,为代码增添坚实的安全屏障。
15 8
|
1天前
|
监控 安全 Java
文件操作不再难!Python系统编程实战,带你轻松驾驭文件系统与I/O
【9月更文挑战第13天】在Python系统编程中,文件操作与I/O管理至关重要。本文通过五个实战案例分享最佳实践:高效遍历文件系统、优雅处理文件读写、利用缓冲机制优化性能、并行处理文件加速任务以及异常处理确保程序稳健。使用pathlib、上下文管理器及concurrent.futures等工具,助你轻松掌握Python文件系统与I/O操作,提升编程效率和项目质量。 示例代码展示了如何使用pathlib遍历目录、with语句安全读写文件、控制缓冲区大小、并行处理多个文件以及捕获异常保证程序稳定运行。通过这些技巧,你将能够在实际项目中更加高效地管理和操作文件。
13 6
|
2天前
|
大数据 数据挖掘 数据处理
Python数据流转的秘密武器:文件系统操作与高效I/O管理实战
【9月更文挑战第12天】在大数据时代,Python凭借其丰富的库和简洁的语法,成为数据处理的首选工具。本文通过实战案例,介绍如何利用Python的`pathlib`模块遍历复杂文件系统,以及通过高效I/O管理(如使用`with`语句和多线程)提升文本文件处理性能。通过这些技巧,你可以轻松从大量分散的文本文件中提取关键信息并整合成新的数据集,从而更好地支持数据分析工作。掌握这些技术,将助你在数据处理领域游刃有余。
12 4
|
10天前
|
数据采集 机器学习/深度学习 数据挖掘
探索Python编程之美:从基础到实战
【9月更文挑战第3天】本文旨在通过深入浅出的方式,带领读者领略Python编程语言的魅力。我们将从基本语法入手,逐步深入至高级特性,最终通过实战案例将理论知识与实践操作相结合。无论你是编程新手还是有一定经验的开发者,这篇文章都将为你提供有价值的见解和技巧。
|
4天前
|
安全 数据安全/隐私保护 Python
Python系统编程实战:文件系统操作与I/O管理,让你的代码更优雅
【9月更文挑战第10天】Python不仅在数据分析和Web开发中表现出色,在系统编程领域也展现出独特魅力。本文将带你深入探讨Python中的文件系统操作与I/O管理,涵盖os、shutil和pathlib等模块的基础使用方法,并通过示例代码展示如何优雅地实现这些功能。通过掌握缓冲、异步I/O等高级特性,你将能够编写更高效、安全且易于维护的Python代码。示例包括使用pathlib遍历目录、设置缓冲区提升文件写入性能以及使用aiofiles实现异步文件操作。掌握这些技能,让你在Python系统编程中更加得心应手。
11 2