python实战

简介: 这篇文章提供了一个Python编写的网络爬虫程序,用于爬取Yahoo知惠袋网站的问答数据,并将其存储为JSON和CSV格式,程序使用了requests、parsel、fake_useragent等库来发送请求、解析HTML和模拟用户代理。
import requests
import re
import os
from parsel import Selector
from fake_useragent import UserAgent
import csv
import time, random
import json

class WisdomBagSearch(object):

    def __init__(self):

        self.MainUrl = "https://chiebukuro.yahoo.co.jp"
        self.MaxNumPage = 2

    def writeHtml(self, url, html):
        with open(url, 'w', encoding='utf-8') as f:
            f.write(str(html))

    def GetHtml(self, Url):
        header = {
            "user-agent": UserAgent().random
        }
        try:
            response = requests.get(Url, headers=header)
            return response.text
        except Exception as e:
            print(e)
            return None

    def GetCategory(self, firstLayerUrl):
        '''
        :param firstLayerUrl: 第一层地址
        :return: 返回所有类别的href值
        '''
        # 1,获取主页面的HTML
        totleHtml = self.GetHtml(firstLayerUrl)
        # 2, 获取模式并根据模式对HTML进行分析
        totleSelector = Selector(text=totleHtml)
        totleTiems = totleSelector.css(
            '.ClapLv2CategoryList_Chie-CategoryList__Category2Wrapper__llQoL a::attr(href)').getall()
        # print(totleTiems)
        # print(CategoryHrefdata) 返回结果中途打印测试
        # writeHtml("./test.txt", response.text) #写出爬取网页源码,测试是否符合预期

        # 3,返回分析结果
        return totleTiems

    def preHanle_categoryHrefS(self, categoryHrefS):
        '''
        :param categoryHrefS:  所有类别的href
        :return: 所有类别可以直接访问的的地址
        '''
        categoryUrls = []
        for categoryHref in categoryHrefS:
            categoryUrl = self.MainUrl + categoryHref + "?flg=1"  # ?flg=1表示进去之后,选择已解决的页面,也就是Secondlayer
            categoryUrls.append(categoryUrl)
        return categoryUrls

    def GetQuestionsUrls(self, categoryUrl):
        '''
        :param categoryUrl: 类别的地址,SecondLayer
        :return: MaxNumPage页内的所有问题的Url
        '''
        # 1,对类别遍历一百次,每一个类别取得MaxNumPage*40个已解决的Url

        questionUrls = []

        for i in range(1, self.MaxNumPage + 1):
            try:
                # 2,获取翻页后类别的网页源码
                NewcategoryUrl = categoryUrl + "&page=" + str(i)
                NewcategoryHtml = self.GetHtml(NewcategoryUrl)

                # 3,对源码分析并提取出当前页面所有回答
                categorySelector = Selector(text=NewcategoryHtml)
                categoryItems = categorySelector.css('.ClapLv3List_Chie-List__ListItem__y_P8W a::attr(href)').getall()
                # print(categoryItems)

                # 4,将分析对结果添加进questionUrls
                questionUrls.append(categoryItems)
            except Exception as e:
                print(e)

            # print(len([i for j in questionUrls for i in j])) #测试是否如预期添加成功
        return [i for j in questionUrls for i in j]  # 返回该类别内40000个问题的地址

    def AnswersNormalization(self, anotherAnswerItems):
        '''
        :param anotherAnswerItems: 其他回答的文本
        :return: 格式化,删除换行,超文本链接
        '''
        try:
            if len(anotherAnswerItems) > 0:
                anotherAnswerItems = list(anotherAnswerItems)
                for i in range(len(anotherAnswerItems)):
                    anotherAnswerItems[i] = re.sub("<.*?>", '', anotherAnswerItems[i])
                    anotherAnswerItems[i] = anotherAnswerItems[i].replace('\n', '')
                    anotherAnswerItems[i] = anotherAnswerItems[i].replace('\r', '')

                return anotherAnswerItems
        except Exception as e:
            print(type(e), e)
            return None

    def GetUserInfo(self, userInfoUrl):
        '''
        :param userInfoUrl: yahoo用户地址
        :return: 用户登陆号,名字
        '''
        if type(userInfoUrl) != type('2') or len(userInfoUrl) < 35 or str(
                userInfoUrl[:35]) != 'https://chiebukuro.yahoo.co.jp/user':  # 非公开ID无法访问该用户
            AnswerItems = ['None', 'D非公開さん']
            return AnswerItems

        userHtml = self.GetHtml(userInfoUrl)
        AnswerSelector = Selector(text=userHtml)
        AnswerItems = AnswerSelector.css('.ClapLv2MyProfile_Chie-MyProfile__ContentItem__DfPaV *::text').getall()
        return [AnswerItems[1], AnswerItems[2]]

    def GetUserInfo2(self, answerHtml):
        answerUserInfoSelector = Selector(text=answerHtml)
        answerUserUrl = answerUserInfoSelector.css(
            '.ClapLv2AnswerItem_Chie-AnswerItem__ItemHead__Mvlc0 a::attr(href)').get()
        loginnumber = os.path.basename(answerUserUrl)
        userName = answerUserInfoSelector.css('.ClapLv1UserInfo_Chie-UserInfo__UserName__1bJYU *::text').get()
        print(loginnumber)
        print(userName)
        return [loginnumber, userName]

    def CheckQuestionStandard(self, questionInfomation):

        return True

    def GetAnswerInfo(self, answerHtml):
        answer = {}
        answerTextSelector = Selector(text=answerHtml)
        answerText = answerTextSelector.css('.ClapLv1TextBlock_Chie-TextBlock__3X4V5 h2::text').getall()
        #  print(answerHtml)
        NewanswerText = ""
        for text in answerText:
            NewanswerText += text
        NewanswerText = NewanswerText.replace('\n', '')
        NewanswerText = NewanswerText.replace('\r', '')

        answerTimeSelector = Selector(text=answerHtml)
        answerTime = answerTimeSelector.css('.ClapLv1UserInfo_Chie-UserInfo__Date__2F1LF *::text').get()

        answerUserInfoSelector = Selector(text=answerHtml)
        answerUserUrl = answerUserInfoSelector.css(
            '.ClapLv2AnswerItem_Chie-AnswerItem__ItemHead__Mvlc0 a::attr(href)').get()

        # self.GetUserInfo2(answerHtml)
        # test

        answerUserInfo = self.GetUserInfo2(answerUserUrl)

        answerApprovelSelector = Selector(text=answerHtml)
        answerApprove = answerApprovelSelector.css(
            '.ClapLv1ReactionCounter_Chie-ReactionCounter__Text__1yosc *::text').get()

        answerReplaySelector = Selector(text=answerHtml)
        answerReplayItems = answerReplaySelector.css(".ClapLv3ReplyList_Chie-ReplyList__Item__33upu").getall()

        replys = []  # 答案的回复模块
        # print(len(answerReplayItems))
        self.GetUserInfo2(answerHtml)
        if len(answerReplayItems) != 0:

            for answerReplay in answerReplayItems:
                reply = {}
                self.GetUserInfo2(answerReplay)
                answerReplyTextSelector = Selector(text=answerReplay)
                answerReplyText = answerReplyTextSelector.css(
                    '.ClapLv1TextBlock_Chie-TextBlock__3X4V5 *::text').getall()
                NewanswerReplyText = ""
                for text in answerReplyText:
                    NewanswerReplyText += text
                NewanswerReplyText = NewanswerReplyText.replace('\n', '')
                NewanswerReplyText = NewanswerReplyText.replace('\r', '')

                answerReplyTimeSelector = Selector(text=answerHtml)
                answerReplyTime = answerReplyTimeSelector.css(
                    '.ClapLv1UserInfo_Chie-UserInfo__DateSmall__3erUK *::text').get()

                answerReplyUserInfoSelector = Selector(text=answerHtml)
                answerUserReplyUrl = answerReplyUserInfoSelector.css(
                    '.ClapLv2ReplyItem_Chie-ReplyItem__ItemHead__HrG5K a::attr(href)').get()
                answerUserReplyInfo = self.GetUserInfo2(answerUserReplyUrl)
                answerUserReplyInfoExp = {'Loginnumber': answerUserReplyInfo[0],
                                          'UserName': answerUserReplyInfo[1]}  # 对用户信息进行解析
                reply['ReplyText'] = NewanswerReplyText
                reply['ReplyTime'] = answerReplyTime
                reply['ReplyUserInfo'] = answerUserReplyInfoExp
                replys.append(reply)

        answer['AnswerText'] = NewanswerText
        answer['AnswerTime'] = answerTime
        answer['AnswerUserInfo'] = {'Loginnumber': answerUserInfo[0],
                                    'UserName': answerUserInfo[1]}

        answer['AnswerApprove'] = answerApprove
        answer['AnswerReply'] = replys
        # print(answer)
        if answerApprove == None:
            answer['AnswerApprove'] = 0
        return answer

    def GetAnswersInfo(self, answerUrl):
        '''
        :param answerUrl: 问题的地址
        :return: 所有回答的内容,时间,用户账户信息,点赞数
        '''

        answers = []
        answerHtml = self.GetHtml(answerUrl)
        answerTextSelector = Selector(text=answerHtml)
        answerHtml = answerTextSelector.css('#ba .ClapLv3BestAnswer_Chie-BestAnswer__1kJ7F').get()
        # print(answerHtml)
        baAnswerInfo = self.GetAnswerInfo(answerHtml)
        try:
            baAnswerInfo['AnswerText'] = baAnswerInfo['AnswerText'][7:]

        except Exception as e:
            print(e)
        answers.append(baAnswerInfo)
        # print(baAnswerInfo)
        pageReply = (int(self.GetOtherAnswerNumber(answerUrl)) - 1) // 5 + 1  # 多页访问回复

        for i in range(1, pageReply + 1):
            NewPageReplyUrl = answerUrl + '?sort=1&page=' + str(i)
            questionHtml = self.GetHtml(NewPageReplyUrl)
            AnswerSelector = Selector(text=questionHtml)
            AnswerItems = AnswerSelector.css('#ans .ClapLv3AnswerList_Chie-AnswerList__Item__2PxD4').getall()
            # print(AnswerItems)
            for answerHtml in AnswerItems:  # 切割出当前页全部回答
                answerInfo = self.GetAnswerInfo(answerHtml)
                answers.append(answerInfo)
        return answers

    def GetQuestionMessage(self, questionUrl):
        questionInformation = []  # 将解析出来的信息,按用户名,时间,回答依次存储
        try:
            # 1
            questionHtml = self.GetHtml(questionUrl)

            self.writeHtml('./test12.txt', questionHtml)
            # 2
            authorNameSelector = Selector(text=questionHtml)
            authorUrl = authorNameSelector.css(
                '#que .ClapLv2QuestionItem_Chie-QuestionItem__Head__1ZglB a::attr(href)').get()
            authorInfo = self.GetUserInfo(authorUrl)
            authorInfo = {'Loginnumber': authorInfo[0], 'UserName': authorInfo[1]}

            authorTimeSelector = Selector(text=questionHtml)
            authorTimeItems = authorTimeSelector.re('itemprop="dateCreated".*?>(.*?)</')

            authorQuestionSelector = Selector(text=questionHtml)
            authorQuestionItems = authorQuestionSelector.css(
                '#que .ClapLv1TextBlock_Chie-TextBlock__3X4V5 *::text').getall()
            queText = authorQuestionItems[0]
            queGlan = authorQuestionItems[-2]  # 问题浏览量

            authorQuestionlabelSelector = Selector(text=questionHtml)  # 问题标签
            authorQuestionLabelItems = authorQuestionlabelSelector.css(
                '#que .ClapLv2QuestionItem_Chie-QuestionItem__SubAnchor__2Pv8w *::text').getall()

            authorThankSelector = Selector(text=questionHtml)  # 问题标签
            authorThankItems = authorThankSelector.css(
                '.ClapLv3BestAnswer_Chie-BestAnswer__Thanks__1ASeS *::text').getall()

            awardMoneyQuestionSelector = Selector(text=questionHtml)
            awardMoneyQuestionItems = awardMoneyQuestionSelector.css(
                '#que .ClapLv2QuestionItem_Chie-QuestionItem__SubChieCoin__2akxj *::text').getall()
            awardMoney = 0

            if len(awardMoneyQuestionItems) == 2:
                awardMoney = awardMoneyQuestionItems[1]

            baAnswerSelector = Selector(text=questionHtml)
            baAnswerSelectorItems = baAnswerSelector.css('#ba .ClapLv1TextBlock_Chie-TextBlock__3X4V5 *::text').getall()

            texts = []
            texts.append(queText)

            if 0 < len(baAnswerSelectorItems) <= 2:  # 特判最佳回答是否存在,以及特判是否有提问者感谢内容
                bestAnswer = baAnswerSelectorItems[1]
                texts.append(bestAnswer)
            else:
                bestAnswer = baAnswerSelectorItems[1]
                thanksAnswer = baAnswerSelectorItems[-3]
                thanksAnswerTime = baAnswerSelectorItems[-1]
                authorTimeItems.append(thanksAnswerTime)
                texts.append(bestAnswer)
                texts.append(thanksAnswer)

            anotherAnswerSelector = Selector(text=questionHtml)
            anotherAnswerItems = anotherAnswerSelector.css('#ans .ClapLv1TextBlock_Chie-TextBlock__3X4V5').getall()

            for anotherAnswer in anotherAnswerItems:
                texts.append(anotherAnswer)

            questionInformation.append(texts[0])  # 问题内容
            questionInformation.append(authorQuestionLabelItems)  # 问题标签
            questionInformation.append(authorTimeItems[0])  # 提问时间
            questionInformation.append(authorInfo)  # 提问者账户信息
            questionInformation.append(questionUrl)  # 问题URL
            questionInformation.append(awardMoney)  # 悬赏数

            answers = self.GetAnswersInfo(questionUrl)  # 回答

            if len(authorThankItems) > 4:  # 特判用户答谢部分是否存在
                ThansInfo = {}
                ThansInfo['AnswerText'] = authorThankItems[2].replace('\n', '')
                ThansInfo['AnswerTime'] = authorThankItems[4]
                ThansInfo['AnswerUserInfo'] = authorInfo
                ThansInfo['AnswerApprove'] = 0
                answers.append(ThansInfo)

            questionInformation.append(answers)  # 加入用户回答

            return questionInformation
        except Exception as e:
            # print(e,'bbbb')
            return []

    def WriteInCsv(self, csvSaveUrl, questionInformation):
        dataList = []
        for question, user, time, text in zip(*questionInformation):
            if user == '' or time == '' or text == '':  # 丢弃脏数据
                continue
            simpleInfomation = {}
            simpleInfomation['Question'] = question
            simpleInfomation['Time'] = time
            simpleInfomation['UserName'] = user
            simpleInfomation['Text'] = text
            dataList.append(simpleInfomation)

        fieldnames = ['Question', 'Time', 'UserName', 'Text']
        with open(csvSaveUrl, 'a+', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writerows(dataList)

    def WriteJson(self, jsonUrl, questionInformation):
        fieldnames = ['QuestionText', 'QuestionLabel', 'QuestionTime', 'QuestionerInformation', 'QuestionUrl',
                      'OfferReward', 'Answers']
        messageItem = {}
        try:
            if len(fieldnames) == len(questionInformation):
                for i in range(len(fieldnames)):
                    messageItem[fieldnames[i]] = questionInformation[i]
                json.dump(messageItem, open(jsonUrl, 'a+', encoding='utf-8'), indent=2, ensure_ascii=False)
        except:
            pass

    def GetCateName(self, categoryUrl):
        categoryHtml = self.GetHtml(categoryUrl)
        cateNameSelector = Selector(text=categoryHtml)
        cateName = cateNameSelector.css('.ClapLv2Title_Chie-Title__TextWrapper__1ccaf h1::text').get()
        return cateName

    def GetOtherAnswerNumber(self, answerUrl):
        '''
        :param answerUrl: 问题网页的资源定位符
        :return:该问题下的其他回复数
        '''
        answerHtml = self.GetHtml(answerUrl)
        answerAnswerSelector = Selector(text=answerHtml)
        answerNumber = answerAnswerSelector.css(
            '.ClapLv2QuestionItem_Chie-QuestionItem__AnswerNumber__3_0RR *::text').get()
        if answerNumber == None:  # 特判无法找到的情况
            answerNumber = 0
        return answerNumber

def main():
    WBSearch = WisdomBagSearch()
    workPath = './Plato44'
    if not os.path.exists(workPath):
        os.mkdir(workPath)
    firstLayerUrl = WBSearch.MainUrl + "/category"
    categoryHrefS = WBSearch.GetCategory(firstLayerUrl)  # 获取主页面下全部类别的href
    secondLayerUrls = WBSearch.preHanle_categoryHrefS(categoryHrefS)  # 对全部类别进行预处理成可以直接跳转的URL
    print("--启动YAHOO知惠网网络爬虫--")

    for secondLayerUrl in secondLayerUrls:
        print(secondLayerUrl)
        questionUrls = WBSearch.GetQuestionsUrls(secondLayerUrl)  # 记录一个类别其中前100页的所有能访问的问题URL

        categoryName = WBSearch.GetCateName(secondLayerUrl)
        print('正在爬取类别:' + categoryName)

        with open(os.path.join(workPath, categoryName + ".json"), 'w', encoding='utf-8') as w:
            s = 1  # 创建json文件

        for questionUrl in questionUrls:
            print(questionUrl)
            try:
                time.sleep(random.random())
                questionInformation = WBSearch.GetQuestionMessage(
                    questionUrl)  # 对每个问题下的html进行解析,提取出提问者名字,时间,内容。回答者们的名字,时间,内容
                # WriteInCsv(secondLayerAddess,questionInformation)
                print(questionInformation)
                if questionInformation == 0:
                    continue

            except Exception as e:
                print(type(e), e)

            jsonUrl = os.path.join(workPath, categoryName + ".json")
            if questionInformation != 0:
                if WBSearch.CheckQuestionStandard(questionInformation):
                    WBSearch.WriteJson(jsonUrl, questionInformation)

def Search(Url):
    WBSearch = WisdomBagSearch()
    workPath = './Plato43'
    secondLayerUrl = Url  # 记录一个类别其中前100页的所有能访问的问题URL
    categoryName = WBSearch.GetCateName(secondLayerUrl)
    print(categoryName)
    if not os.path.exists(workPath):
        os.mkdir(workPath)
    print(secondLayerUrl)
    questionUrls = WBSearch.GetQuestionsUrls(secondLayerUrl)  # 记录一个类别其中前100页的所有能访问的问题URL

    categoryName = WBSearch.GetCateName(secondLayerUrl)
    print('正在爬取类别:' + categoryName)

    with open(os.path.join(workPath, categoryName + ".json"), 'w', encoding='utf-8') as w:
        s = 1  # 创建json文件

    for questionUrl in questionUrls:
        print(questionUrl)
        # test
        try:
            time.sleep(random.random())
            questionInformation = WBSearch.GetQuestionMessage(questionUrl)  # 对每个问题下的html进行解析,提取出提问者名字,时间,内容。回答者们的名字,时间,内容
            # WriteInCsv(secondLayerAddess,questionInformation)
            print(questionInformation)
            if questionInformation == 0:
                continue

        except Exception as e:
            print(type(e), e)

        jsonUrl = os.path.join(workPath, categoryName + ".json")
        if questionInformation != 0:
            if WBSearch.CheckQuestionStandard(questionInformation):
                WBSearch.WriteJson(jsonUrl, questionInformation)

if __name__ == "__main__":
    main()
    Url = "https://chiebukuro.yahoo.co.jp/category/2078675272/question/list?flg=1"
    #Search(Url)
相关文章
|
27天前
|
数据采集 数据可视化 数据挖掘
Python数据分析实战:Pandas处理结构化数据的核心技巧
在数据驱动时代,结构化数据是分析决策的基础。Python的Pandas库凭借其高效的数据结构和丰富的功能,成为处理结构化数据的利器。本文通过真实场景和代码示例,讲解Pandas的核心操作,包括数据加载、清洗、转换、分析与性能优化,帮助你从数据中提取有价值的洞察,提升数据处理效率。
103 3
|
27天前
|
数据可视化 Linux iOS开发
Python脚本转EXE文件实战指南:从原理到操作全解析
本教程详解如何将Python脚本打包为EXE文件,涵盖PyInstaller、auto-py-to-exe和cx_Freeze三种工具,包含实战案例与常见问题解决方案,助你轻松发布独立运行的Python程序。
329 2
|
27天前
|
存储 监控 API
Python实战:跨平台电商数据聚合系统的技术实现
本文介绍如何通过标准化API调用协议,实现淘宝、京东、拼多多等电商平台的商品数据自动化采集、清洗与存储。内容涵盖技术架构设计、Python代码示例及高阶应用(如价格监控系统),提供可直接落地的技术方案,帮助开发者解决多平台数据同步难题。
|
2月前
|
数据采集 数据挖掘 测试技术
Go与Python爬虫实战对比:从开发效率到性能瓶颈的深度解析
本文对比了Python与Go在爬虫开发中的特点。Python凭借Scrapy等框架在开发效率和易用性上占优,适合快速开发与中小型项目;而Go凭借高并发和高性能优势,适用于大规模、长期运行的爬虫服务。文章通过代码示例和性能测试,分析了两者在并发能力、错误处理、部署维护等方面的差异,并探讨了未来融合发展的趋势。
157 0
|
2月前
|
IDE 开发工具 数据安全/隐私保护
Python循环嵌套:从入门到实战的完整指南
循环嵌套是Python中处理多维数据和复杂逻辑的重要工具。本文通过实例讲解嵌套循环的基本用法、常见组合、性能优化技巧及实战应用,帮助开发者掌握其核心思想,避免常见错误,并探索替代方案与进阶方向。
104 0
|
2月前
|
机器学习/深度学习 算法 文件存储
神经架构搜索NAS详解:三种核心算法原理与Python实战代码
神经架构搜索(NAS)正被广泛应用于大模型及语言/视觉模型设计,如LangVision-LoRA-NAS、Jet-Nemotron等。本文回顾NAS核心技术,解析其自动化设计原理,探讨强化学习、进化算法与梯度方法的应用与差异,揭示NAS在大模型时代的潜力与挑战。
285 6
神经架构搜索NAS详解:三种核心算法原理与Python实战代码
|
13天前
|
机器学习/深度学习 文字识别 Java
Python实现PDF图片OCR识别:从原理到实战的全流程解析
本文详解2025年Python实现扫描PDF文本提取的四大OCR方案(Tesseract、EasyOCR、PaddleOCR、OCRmyPDF),涵盖环境配置、图像预处理、核心识别与性能优化,结合财务票据、古籍数字化等实战场景,助力高效构建自动化文档处理系统。
212 0
|
12天前
|
小程序 PHP 图形学
热门小游戏源码(Python+PHP)下载-微信小程序游戏源码Unity发实战指南​
本文详解如何结合Python、PHP与Unity开发并部署小游戏至微信小程序。涵盖技术选型、Pygame实战、PHP后端对接、Unity转换适配及性能优化,提供从原型到发布的完整指南,助力开发者快速上手并发布游戏。
|
2月前
|
数据采集 消息中间件 并行计算
Python多线程与多进程性能对比:从原理到实战的深度解析
在Python编程中,多线程与多进程是提升并发性能的关键手段。本文通过实验数据、代码示例和通俗比喻,深入解析两者在不同任务类型下的性能表现,帮助开发者科学选择并发策略,优化程序效率。
116 1
|
14天前
|
JavaScript 前端开发 安全
【逆向】Python 调用 JS 代码实战:使用 pyexecjs 与 Node.js 无缝衔接
本文介绍了如何使用 Python 的轻量级库 `pyexecjs` 调用 JavaScript 代码,并结合 Node.js 实现完整的执行流程。内容涵盖环境搭建、基本使用、常见问题解决方案及爬虫逆向分析中的实战技巧,帮助开发者在 Python 中高效处理 JS 逻辑。

推荐镜像

更多