python实战

简介: 这篇文章提供了一个Python编写的网络爬虫程序,用于爬取Yahoo知惠袋网站的问答数据,并将其存储为JSON和CSV格式,程序使用了requests、parsel、fake_useragent等库来发送请求、解析HTML和模拟用户代理。
import requests
import re
import os
from parsel import Selector
from fake_useragent import UserAgent
import csv
import time, random
import json

class WisdomBagSearch(object):

    def __init__(self):

        self.MainUrl = "https://chiebukuro.yahoo.co.jp"
        self.MaxNumPage = 2

    def writeHtml(self, url, html):
        with open(url, 'w', encoding='utf-8') as f:
            f.write(str(html))

    def GetHtml(self, Url):
        header = {
            "user-agent": UserAgent().random
        }
        try:
            response = requests.get(Url, headers=header)
            return response.text
        except Exception as e:
            print(e)
            return None

    def GetCategory(self, firstLayerUrl):
        '''
        :param firstLayerUrl: 第一层地址
        :return: 返回所有类别的href值
        '''
        # 1,获取主页面的HTML
        totleHtml = self.GetHtml(firstLayerUrl)
        # 2, 获取模式并根据模式对HTML进行分析
        totleSelector = Selector(text=totleHtml)
        totleTiems = totleSelector.css(
            '.ClapLv2CategoryList_Chie-CategoryList__Category2Wrapper__llQoL a::attr(href)').getall()
        # print(totleTiems)
        # print(CategoryHrefdata) 返回结果中途打印测试
        # writeHtml("./test.txt", response.text) #写出爬取网页源码,测试是否符合预期

        # 3,返回分析结果
        return totleTiems

    def preHanle_categoryHrefS(self, categoryHrefS):
        '''
        :param categoryHrefS:  所有类别的href
        :return: 所有类别可以直接访问的的地址
        '''
        categoryUrls = []
        for categoryHref in categoryHrefS:
            categoryUrl = self.MainUrl + categoryHref + "?flg=1"  # ?flg=1表示进去之后,选择已解决的页面,也就是Secondlayer
            categoryUrls.append(categoryUrl)
        return categoryUrls

    def GetQuestionsUrls(self, categoryUrl):
        '''
        :param categoryUrl: 类别的地址,SecondLayer
        :return: MaxNumPage页内的所有问题的Url
        '''
        # 1,对类别遍历一百次,每一个类别取得MaxNumPage*40个已解决的Url

        questionUrls = []

        for i in range(1, self.MaxNumPage + 1):
            try:
                # 2,获取翻页后类别的网页源码
                NewcategoryUrl = categoryUrl + "&page=" + str(i)
                NewcategoryHtml = self.GetHtml(NewcategoryUrl)

                # 3,对源码分析并提取出当前页面所有回答
                categorySelector = Selector(text=NewcategoryHtml)
                categoryItems = categorySelector.css('.ClapLv3List_Chie-List__ListItem__y_P8W a::attr(href)').getall()
                # print(categoryItems)

                # 4,将分析对结果添加进questionUrls
                questionUrls.append(categoryItems)
            except Exception as e:
                print(e)

            # print(len([i for j in questionUrls for i in j])) #测试是否如预期添加成功
        return [i for j in questionUrls for i in j]  # 返回该类别内40000个问题的地址

    def AnswersNormalization(self, anotherAnswerItems):
        '''
        :param anotherAnswerItems: 其他回答的文本
        :return: 格式化,删除换行,超文本链接
        '''
        try:
            if len(anotherAnswerItems) > 0:
                anotherAnswerItems = list(anotherAnswerItems)
                for i in range(len(anotherAnswerItems)):
                    anotherAnswerItems[i] = re.sub("<.*?>", '', anotherAnswerItems[i])
                    anotherAnswerItems[i] = anotherAnswerItems[i].replace('\n', '')
                    anotherAnswerItems[i] = anotherAnswerItems[i].replace('\r', '')

                return anotherAnswerItems
        except Exception as e:
            print(type(e), e)
            return None

    def GetUserInfo(self, userInfoUrl):
        '''
        :param userInfoUrl: yahoo用户地址
        :return: 用户登陆号,名字
        '''
        if type(userInfoUrl) != type('2') or len(userInfoUrl) < 35 or str(
                userInfoUrl[:35]) != 'https://chiebukuro.yahoo.co.jp/user':  # 非公开ID无法访问该用户
            AnswerItems = ['None', 'D非公開さん']
            return AnswerItems

        userHtml = self.GetHtml(userInfoUrl)
        AnswerSelector = Selector(text=userHtml)
        AnswerItems = AnswerSelector.css('.ClapLv2MyProfile_Chie-MyProfile__ContentItem__DfPaV *::text').getall()
        return [AnswerItems[1], AnswerItems[2]]

    def GetUserInfo2(self, answerHtml):
        answerUserInfoSelector = Selector(text=answerHtml)
        answerUserUrl = answerUserInfoSelector.css(
            '.ClapLv2AnswerItem_Chie-AnswerItem__ItemHead__Mvlc0 a::attr(href)').get()
        loginnumber = os.path.basename(answerUserUrl)
        userName = answerUserInfoSelector.css('.ClapLv1UserInfo_Chie-UserInfo__UserName__1bJYU *::text').get()
        print(loginnumber)
        print(userName)
        return [loginnumber, userName]

    def CheckQuestionStandard(self, questionInfomation):

        return True

    def GetAnswerInfo(self, answerHtml):
        answer = {}
        answerTextSelector = Selector(text=answerHtml)
        answerText = answerTextSelector.css('.ClapLv1TextBlock_Chie-TextBlock__3X4V5 h2::text').getall()
        #  print(answerHtml)
        NewanswerText = ""
        for text in answerText:
            NewanswerText += text
        NewanswerText = NewanswerText.replace('\n', '')
        NewanswerText = NewanswerText.replace('\r', '')

        answerTimeSelector = Selector(text=answerHtml)
        answerTime = answerTimeSelector.css('.ClapLv1UserInfo_Chie-UserInfo__Date__2F1LF *::text').get()

        answerUserInfoSelector = Selector(text=answerHtml)
        answerUserUrl = answerUserInfoSelector.css(
            '.ClapLv2AnswerItem_Chie-AnswerItem__ItemHead__Mvlc0 a::attr(href)').get()

        # self.GetUserInfo2(answerHtml)
        # test

        answerUserInfo = self.GetUserInfo2(answerUserUrl)

        answerApprovelSelector = Selector(text=answerHtml)
        answerApprove = answerApprovelSelector.css(
            '.ClapLv1ReactionCounter_Chie-ReactionCounter__Text__1yosc *::text').get()

        answerReplaySelector = Selector(text=answerHtml)
        answerReplayItems = answerReplaySelector.css(".ClapLv3ReplyList_Chie-ReplyList__Item__33upu").getall()

        replys = []  # 答案的回复模块
        # print(len(answerReplayItems))
        self.GetUserInfo2(answerHtml)
        if len(answerReplayItems) != 0:

            for answerReplay in answerReplayItems:
                reply = {}
                self.GetUserInfo2(answerReplay)
                answerReplyTextSelector = Selector(text=answerReplay)
                answerReplyText = answerReplyTextSelector.css(
                    '.ClapLv1TextBlock_Chie-TextBlock__3X4V5 *::text').getall()
                NewanswerReplyText = ""
                for text in answerReplyText:
                    NewanswerReplyText += text
                NewanswerReplyText = NewanswerReplyText.replace('\n', '')
                NewanswerReplyText = NewanswerReplyText.replace('\r', '')

                answerReplyTimeSelector = Selector(text=answerHtml)
                answerReplyTime = answerReplyTimeSelector.css(
                    '.ClapLv1UserInfo_Chie-UserInfo__DateSmall__3erUK *::text').get()

                answerReplyUserInfoSelector = Selector(text=answerHtml)
                answerUserReplyUrl = answerReplyUserInfoSelector.css(
                    '.ClapLv2ReplyItem_Chie-ReplyItem__ItemHead__HrG5K a::attr(href)').get()
                answerUserReplyInfo = self.GetUserInfo2(answerUserReplyUrl)
                answerUserReplyInfoExp = {'Loginnumber': answerUserReplyInfo[0],
                                          'UserName': answerUserReplyInfo[1]}  # 对用户信息进行解析
                reply['ReplyText'] = NewanswerReplyText
                reply['ReplyTime'] = answerReplyTime
                reply['ReplyUserInfo'] = answerUserReplyInfoExp
                replys.append(reply)

        answer['AnswerText'] = NewanswerText
        answer['AnswerTime'] = answerTime
        answer['AnswerUserInfo'] = {'Loginnumber': answerUserInfo[0],
                                    'UserName': answerUserInfo[1]}

        answer['AnswerApprove'] = answerApprove
        answer['AnswerReply'] = replys
        # print(answer)
        if answerApprove == None:
            answer['AnswerApprove'] = 0
        return answer

    def GetAnswersInfo(self, answerUrl):
        '''
        :param answerUrl: 问题的地址
        :return: 所有回答的内容,时间,用户账户信息,点赞数
        '''

        answers = []
        answerHtml = self.GetHtml(answerUrl)
        answerTextSelector = Selector(text=answerHtml)
        answerHtml = answerTextSelector.css('#ba .ClapLv3BestAnswer_Chie-BestAnswer__1kJ7F').get()
        # print(answerHtml)
        baAnswerInfo = self.GetAnswerInfo(answerHtml)
        try:
            baAnswerInfo['AnswerText'] = baAnswerInfo['AnswerText'][7:]

        except Exception as e:
            print(e)
        answers.append(baAnswerInfo)
        # print(baAnswerInfo)
        pageReply = (int(self.GetOtherAnswerNumber(answerUrl)) - 1) // 5 + 1  # 多页访问回复

        for i in range(1, pageReply + 1):
            NewPageReplyUrl = answerUrl + '?sort=1&page=' + str(i)
            questionHtml = self.GetHtml(NewPageReplyUrl)
            AnswerSelector = Selector(text=questionHtml)
            AnswerItems = AnswerSelector.css('#ans .ClapLv3AnswerList_Chie-AnswerList__Item__2PxD4').getall()
            # print(AnswerItems)
            for answerHtml in AnswerItems:  # 切割出当前页全部回答
                answerInfo = self.GetAnswerInfo(answerHtml)
                answers.append(answerInfo)
        return answers

    def GetQuestionMessage(self, questionUrl):
        questionInformation = []  # 将解析出来的信息,按用户名,时间,回答依次存储
        try:
            # 1
            questionHtml = self.GetHtml(questionUrl)

            self.writeHtml('./test12.txt', questionHtml)
            # 2
            authorNameSelector = Selector(text=questionHtml)
            authorUrl = authorNameSelector.css(
                '#que .ClapLv2QuestionItem_Chie-QuestionItem__Head__1ZglB a::attr(href)').get()
            authorInfo = self.GetUserInfo(authorUrl)
            authorInfo = {'Loginnumber': authorInfo[0], 'UserName': authorInfo[1]}

            authorTimeSelector = Selector(text=questionHtml)
            authorTimeItems = authorTimeSelector.re('itemprop="dateCreated".*?>(.*?)</')

            authorQuestionSelector = Selector(text=questionHtml)
            authorQuestionItems = authorQuestionSelector.css(
                '#que .ClapLv1TextBlock_Chie-TextBlock__3X4V5 *::text').getall()
            queText = authorQuestionItems[0]
            queGlan = authorQuestionItems[-2]  # 问题浏览量

            authorQuestionlabelSelector = Selector(text=questionHtml)  # 问题标签
            authorQuestionLabelItems = authorQuestionlabelSelector.css(
                '#que .ClapLv2QuestionItem_Chie-QuestionItem__SubAnchor__2Pv8w *::text').getall()

            authorThankSelector = Selector(text=questionHtml)  # 问题标签
            authorThankItems = authorThankSelector.css(
                '.ClapLv3BestAnswer_Chie-BestAnswer__Thanks__1ASeS *::text').getall()

            awardMoneyQuestionSelector = Selector(text=questionHtml)
            awardMoneyQuestionItems = awardMoneyQuestionSelector.css(
                '#que .ClapLv2QuestionItem_Chie-QuestionItem__SubChieCoin__2akxj *::text').getall()
            awardMoney = 0

            if len(awardMoneyQuestionItems) == 2:
                awardMoney = awardMoneyQuestionItems[1]

            baAnswerSelector = Selector(text=questionHtml)
            baAnswerSelectorItems = baAnswerSelector.css('#ba .ClapLv1TextBlock_Chie-TextBlock__3X4V5 *::text').getall()

            texts = []
            texts.append(queText)

            if 0 < len(baAnswerSelectorItems) <= 2:  # 特判最佳回答是否存在,以及特判是否有提问者感谢内容
                bestAnswer = baAnswerSelectorItems[1]
                texts.append(bestAnswer)
            else:
                bestAnswer = baAnswerSelectorItems[1]
                thanksAnswer = baAnswerSelectorItems[-3]
                thanksAnswerTime = baAnswerSelectorItems[-1]
                authorTimeItems.append(thanksAnswerTime)
                texts.append(bestAnswer)
                texts.append(thanksAnswer)

            anotherAnswerSelector = Selector(text=questionHtml)
            anotherAnswerItems = anotherAnswerSelector.css('#ans .ClapLv1TextBlock_Chie-TextBlock__3X4V5').getall()

            for anotherAnswer in anotherAnswerItems:
                texts.append(anotherAnswer)

            questionInformation.append(texts[0])  # 问题内容
            questionInformation.append(authorQuestionLabelItems)  # 问题标签
            questionInformation.append(authorTimeItems[0])  # 提问时间
            questionInformation.append(authorInfo)  # 提问者账户信息
            questionInformation.append(questionUrl)  # 问题URL
            questionInformation.append(awardMoney)  # 悬赏数

            answers = self.GetAnswersInfo(questionUrl)  # 回答

            if len(authorThankItems) > 4:  # 特判用户答谢部分是否存在
                ThansInfo = {}
                ThansInfo['AnswerText'] = authorThankItems[2].replace('\n', '')
                ThansInfo['AnswerTime'] = authorThankItems[4]
                ThansInfo['AnswerUserInfo'] = authorInfo
                ThansInfo['AnswerApprove'] = 0
                answers.append(ThansInfo)

            questionInformation.append(answers)  # 加入用户回答

            return questionInformation
        except Exception as e:
            # print(e,'bbbb')
            return []

    def WriteInCsv(self, csvSaveUrl, questionInformation):
        dataList = []
        for question, user, time, text in zip(*questionInformation):
            if user == '' or time == '' or text == '':  # 丢弃脏数据
                continue
            simpleInfomation = {}
            simpleInfomation['Question'] = question
            simpleInfomation['Time'] = time
            simpleInfomation['UserName'] = user
            simpleInfomation['Text'] = text
            dataList.append(simpleInfomation)

        fieldnames = ['Question', 'Time', 'UserName', 'Text']
        with open(csvSaveUrl, 'a+', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writerows(dataList)

    def WriteJson(self, jsonUrl, questionInformation):
        fieldnames = ['QuestionText', 'QuestionLabel', 'QuestionTime', 'QuestionerInformation', 'QuestionUrl',
                      'OfferReward', 'Answers']
        messageItem = {}
        try:
            if len(fieldnames) == len(questionInformation):
                for i in range(len(fieldnames)):
                    messageItem[fieldnames[i]] = questionInformation[i]
                json.dump(messageItem, open(jsonUrl, 'a+', encoding='utf-8'), indent=2, ensure_ascii=False)
        except:
            pass

    def GetCateName(self, categoryUrl):
        categoryHtml = self.GetHtml(categoryUrl)
        cateNameSelector = Selector(text=categoryHtml)
        cateName = cateNameSelector.css('.ClapLv2Title_Chie-Title__TextWrapper__1ccaf h1::text').get()
        return cateName

    def GetOtherAnswerNumber(self, answerUrl):
        '''
        :param answerUrl: 问题网页的资源定位符
        :return:该问题下的其他回复数
        '''
        answerHtml = self.GetHtml(answerUrl)
        answerAnswerSelector = Selector(text=answerHtml)
        answerNumber = answerAnswerSelector.css(
            '.ClapLv2QuestionItem_Chie-QuestionItem__AnswerNumber__3_0RR *::text').get()
        if answerNumber == None:  # 特判无法找到的情况
            answerNumber = 0
        return answerNumber

def main():
    WBSearch = WisdomBagSearch()
    workPath = './Plato44'
    if not os.path.exists(workPath):
        os.mkdir(workPath)
    firstLayerUrl = WBSearch.MainUrl + "/category"
    categoryHrefS = WBSearch.GetCategory(firstLayerUrl)  # 获取主页面下全部类别的href
    secondLayerUrls = WBSearch.preHanle_categoryHrefS(categoryHrefS)  # 对全部类别进行预处理成可以直接跳转的URL
    print("--启动YAHOO知惠网网络爬虫--")

    for secondLayerUrl in secondLayerUrls:
        print(secondLayerUrl)
        questionUrls = WBSearch.GetQuestionsUrls(secondLayerUrl)  # 记录一个类别其中前100页的所有能访问的问题URL

        categoryName = WBSearch.GetCateName(secondLayerUrl)
        print('正在爬取类别:' + categoryName)

        with open(os.path.join(workPath, categoryName + ".json"), 'w', encoding='utf-8') as w:
            s = 1  # 创建json文件

        for questionUrl in questionUrls:
            print(questionUrl)
            try:
                time.sleep(random.random())
                questionInformation = WBSearch.GetQuestionMessage(
                    questionUrl)  # 对每个问题下的html进行解析,提取出提问者名字,时间,内容。回答者们的名字,时间,内容
                # WriteInCsv(secondLayerAddess,questionInformation)
                print(questionInformation)
                if questionInformation == 0:
                    continue

            except Exception as e:
                print(type(e), e)

            jsonUrl = os.path.join(workPath, categoryName + ".json")
            if questionInformation != 0:
                if WBSearch.CheckQuestionStandard(questionInformation):
                    WBSearch.WriteJson(jsonUrl, questionInformation)

def Search(Url):
    WBSearch = WisdomBagSearch()
    workPath = './Plato43'
    secondLayerUrl = Url  # 记录一个类别其中前100页的所有能访问的问题URL
    categoryName = WBSearch.GetCateName(secondLayerUrl)
    print(categoryName)
    if not os.path.exists(workPath):
        os.mkdir(workPath)
    print(secondLayerUrl)
    questionUrls = WBSearch.GetQuestionsUrls(secondLayerUrl)  # 记录一个类别其中前100页的所有能访问的问题URL

    categoryName = WBSearch.GetCateName(secondLayerUrl)
    print('正在爬取类别:' + categoryName)

    with open(os.path.join(workPath, categoryName + ".json"), 'w', encoding='utf-8') as w:
        s = 1  # 创建json文件

    for questionUrl in questionUrls:
        print(questionUrl)
        # test
        try:
            time.sleep(random.random())
            questionInformation = WBSearch.GetQuestionMessage(questionUrl)  # 对每个问题下的html进行解析,提取出提问者名字,时间,内容。回答者们的名字,时间,内容
            # WriteInCsv(secondLayerAddess,questionInformation)
            print(questionInformation)
            if questionInformation == 0:
                continue

        except Exception as e:
            print(type(e), e)

        jsonUrl = os.path.join(workPath, categoryName + ".json")
        if questionInformation != 0:
            if WBSearch.CheckQuestionStandard(questionInformation):
                WBSearch.WriteJson(jsonUrl, questionInformation)

if __name__ == "__main__":
    main()
    Url = "https://chiebukuro.yahoo.co.jp/category/2078675272/question/list?flg=1"
    #Search(Url)
相关文章
|
1月前
|
SQL 关系型数据库 数据库
Python SQLAlchemy模块:从入门到实战的数据库操作指南
免费提供Python+PyCharm编程环境,结合SQLAlchemy ORM框架详解数据库开发。涵盖连接配置、模型定义、CRUD操作、事务控制及Alembic迁移工具,以电商订单系统为例,深入讲解高并发场景下的性能优化与最佳实践,助你高效构建数据驱动应用。
287 7
|
1月前
|
数据采集 Web App开发 数据安全/隐私保护
实战:Python爬虫如何模拟登录与维持会话状态
实战:Python爬虫如何模拟登录与维持会话状态
|
1月前
|
传感器 运维 前端开发
Python离群值检测实战:使用distfit库实现基于分布拟合的异常检测
本文解析异常(anomaly)与新颖性(novelty)检测的本质差异,结合distfit库演示基于概率密度拟合的单变量无监督异常检测方法,涵盖全局、上下文与集体离群值识别,助力构建高可解释性模型。
302 10
Python离群值检测实战:使用distfit库实现基于分布拟合的异常检测
|
1月前
|
数据采集 监控 数据库
Python异步编程实战:爬虫案例
🌟 蒋星熠Jaxonic,代码为舟的星际旅人。从回调地狱到async/await协程天堂,亲历Python异步编程演进。分享高性能爬虫、数据库异步操作、限流监控等实战经验,助你驾驭并发,在二进制星河中谱写极客诗篇。
Python异步编程实战:爬虫案例
|
1月前
|
Cloud Native 算法 API
Python API接口实战指南:从入门到精通
🌟蒋星熠Jaxonic,技术宇宙的星际旅人。深耕API开发,以Python为舟,探索RESTful、GraphQL等接口奥秘。擅长requests、aiohttp实战,专注性能优化与架构设计,用代码连接万物,谱写极客诗篇。
Python API接口实战指南:从入门到精通
|
1月前
|
存储 分布式计算 测试技术
Python学习之旅:从基础到实战第三章
总体来说,第三章是Python学习路程中的一个重要里程碑,它不仅加深了对基础概念的理解,还引入了更多高级特性,为后续的深入学习和实际应用打下坚实的基础。通过这一章的学习,读者应该能够更好地理解Python编程的核心概念,并准备好应对更复杂的编程挑战。
105 12
|
2月前
|
数据采集 存储 XML
Python爬虫技术:从基础到实战的完整教程
最后强调: 父母法律法规限制下进行网络抓取活动; 不得侵犯他人版权隐私利益; 同时也要注意个人安全防止泄露敏感信息.
676 19
|
1月前
|
存储 数据采集 监控
Python文件操作全攻略:从基础到高级实战
本文系统讲解Python文件操作核心技巧,涵盖基础读写、指针控制、异常处理及大文件分块处理等实战场景。结合日志分析、CSV清洗等案例,助你高效掌握文本与二进制文件处理,提升程序健壮性与开发效率。(238字)
275 1
|
1月前
|
存储 Java 调度
Python定时任务实战:APScheduler从入门到精通
APScheduler是Python强大的定时任务框架,通过触发器、执行器、任务存储和调度器四大组件,灵活实现各类周期性任务。支持内存、数据库、Redis等持久化存储,适用于Web集成、数据抓取、邮件发送等场景,解决传统sleep循环的诸多缺陷,助力构建稳定可靠的自动化系统。(238字)
473 1
|
2月前
|
设计模式 人工智能 API
AI智能体开发实战:17种核心架构模式详解与Python代码实现
本文系统解析17种智能体架构设计模式,涵盖多智能体协作、思维树、反思优化与工具调用等核心范式,结合LangChain与LangGraph实现代码工作流,并通过真实案例验证效果,助力构建高效AI系统。
424 7

推荐镜像

更多