python实战

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 这篇文章提供了一个Python编写的网络爬虫程序,用于爬取Yahoo知惠袋网站的问答数据,并将其存储为JSON和CSV格式,程序使用了requests、parsel、fake_useragent等库来发送请求、解析HTML和模拟用户代理。
import requests
import re
import os
from parsel import Selector
from fake_useragent import UserAgent
import csv
import time, random
import json

class WisdomBagSearch(object):

    def __init__(self):

        self.MainUrl = "https://chiebukuro.yahoo.co.jp"
        self.MaxNumPage = 2

    def writeHtml(self, url, html):
        with open(url, 'w', encoding='utf-8') as f:
            f.write(str(html))

    def GetHtml(self, Url):
        header = {
            "user-agent": UserAgent().random
        }
        try:
            response = requests.get(Url, headers=header)
            return response.text
        except Exception as e:
            print(e)
            return None

    def GetCategory(self, firstLayerUrl):
        '''
        :param firstLayerUrl: 第一层地址
        :return: 返回所有类别的href值
        '''
        # 1,获取主页面的HTML
        totleHtml = self.GetHtml(firstLayerUrl)
        # 2, 获取模式并根据模式对HTML进行分析
        totleSelector = Selector(text=totleHtml)
        totleTiems = totleSelector.css(
            '.ClapLv2CategoryList_Chie-CategoryList__Category2Wrapper__llQoL a::attr(href)').getall()
        # print(totleTiems)
        # print(CategoryHrefdata) 返回结果中途打印测试
        # writeHtml("./test.txt", response.text) #写出爬取网页源码,测试是否符合预期

        # 3,返回分析结果
        return totleTiems

    def preHanle_categoryHrefS(self, categoryHrefS):
        '''
        :param categoryHrefS:  所有类别的href
        :return: 所有类别可以直接访问的的地址
        '''
        categoryUrls = []
        for categoryHref in categoryHrefS:
            categoryUrl = self.MainUrl + categoryHref + "?flg=1"  # ?flg=1表示进去之后,选择已解决的页面,也就是Secondlayer
            categoryUrls.append(categoryUrl)
        return categoryUrls

    def GetQuestionsUrls(self, categoryUrl):
        '''
        :param categoryUrl: 类别的地址,SecondLayer
        :return: MaxNumPage页内的所有问题的Url
        '''
        # 1,对类别遍历一百次,每一个类别取得MaxNumPage*40个已解决的Url

        questionUrls = []

        for i in range(1, self.MaxNumPage + 1):
            try:
                # 2,获取翻页后类别的网页源码
                NewcategoryUrl = categoryUrl + "&page=" + str(i)
                NewcategoryHtml = self.GetHtml(NewcategoryUrl)

                # 3,对源码分析并提取出当前页面所有回答
                categorySelector = Selector(text=NewcategoryHtml)
                categoryItems = categorySelector.css('.ClapLv3List_Chie-List__ListItem__y_P8W a::attr(href)').getall()
                # print(categoryItems)

                # 4,将分析对结果添加进questionUrls
                questionUrls.append(categoryItems)
            except Exception as e:
                print(e)

            # print(len([i for j in questionUrls for i in j])) #测试是否如预期添加成功
        return [i for j in questionUrls for i in j]  # 返回该类别内40000个问题的地址

    def AnswersNormalization(self, anotherAnswerItems):
        '''
        :param anotherAnswerItems: 其他回答的文本
        :return: 格式化,删除换行,超文本链接
        '''
        try:
            if len(anotherAnswerItems) > 0:
                anotherAnswerItems = list(anotherAnswerItems)
                for i in range(len(anotherAnswerItems)):
                    anotherAnswerItems[i] = re.sub("<.*?>", '', anotherAnswerItems[i])
                    anotherAnswerItems[i] = anotherAnswerItems[i].replace('\n', '')
                    anotherAnswerItems[i] = anotherAnswerItems[i].replace('\r', '')

                return anotherAnswerItems
        except Exception as e:
            print(type(e), e)
            return None

    def GetUserInfo(self, userInfoUrl):
        '''
        :param userInfoUrl: yahoo用户地址
        :return: 用户登陆号,名字
        '''
        if type(userInfoUrl) != type('2') or len(userInfoUrl) < 35 or str(
                userInfoUrl[:35]) != 'https://chiebukuro.yahoo.co.jp/user':  # 非公开ID无法访问该用户
            AnswerItems = ['None', 'D非公開さん']
            return AnswerItems

        userHtml = self.GetHtml(userInfoUrl)
        AnswerSelector = Selector(text=userHtml)
        AnswerItems = AnswerSelector.css('.ClapLv2MyProfile_Chie-MyProfile__ContentItem__DfPaV *::text').getall()
        return [AnswerItems[1], AnswerItems[2]]

    def GetUserInfo2(self, answerHtml):
        answerUserInfoSelector = Selector(text=answerHtml)
        answerUserUrl = answerUserInfoSelector.css(
            '.ClapLv2AnswerItem_Chie-AnswerItem__ItemHead__Mvlc0 a::attr(href)').get()
        loginnumber = os.path.basename(answerUserUrl)
        userName = answerUserInfoSelector.css('.ClapLv1UserInfo_Chie-UserInfo__UserName__1bJYU *::text').get()
        print(loginnumber)
        print(userName)
        return [loginnumber, userName]

    def CheckQuestionStandard(self, questionInfomation):

        return True

    def GetAnswerInfo(self, answerHtml):
        answer = {}
        answerTextSelector = Selector(text=answerHtml)
        answerText = answerTextSelector.css('.ClapLv1TextBlock_Chie-TextBlock__3X4V5 h2::text').getall()
        #  print(answerHtml)
        NewanswerText = ""
        for text in answerText:
            NewanswerText += text
        NewanswerText = NewanswerText.replace('\n', '')
        NewanswerText = NewanswerText.replace('\r', '')

        answerTimeSelector = Selector(text=answerHtml)
        answerTime = answerTimeSelector.css('.ClapLv1UserInfo_Chie-UserInfo__Date__2F1LF *::text').get()

        answerUserInfoSelector = Selector(text=answerHtml)
        answerUserUrl = answerUserInfoSelector.css(
            '.ClapLv2AnswerItem_Chie-AnswerItem__ItemHead__Mvlc0 a::attr(href)').get()

        # self.GetUserInfo2(answerHtml)
        # test

        answerUserInfo = self.GetUserInfo2(answerUserUrl)

        answerApprovelSelector = Selector(text=answerHtml)
        answerApprove = answerApprovelSelector.css(
            '.ClapLv1ReactionCounter_Chie-ReactionCounter__Text__1yosc *::text').get()

        answerReplaySelector = Selector(text=answerHtml)
        answerReplayItems = answerReplaySelector.css(".ClapLv3ReplyList_Chie-ReplyList__Item__33upu").getall()

        replys = []  # 答案的回复模块
        # print(len(answerReplayItems))
        self.GetUserInfo2(answerHtml)
        if len(answerReplayItems) != 0:

            for answerReplay in answerReplayItems:
                reply = {}
                self.GetUserInfo2(answerReplay)
                answerReplyTextSelector = Selector(text=answerReplay)
                answerReplyText = answerReplyTextSelector.css(
                    '.ClapLv1TextBlock_Chie-TextBlock__3X4V5 *::text').getall()
                NewanswerReplyText = ""
                for text in answerReplyText:
                    NewanswerReplyText += text
                NewanswerReplyText = NewanswerReplyText.replace('\n', '')
                NewanswerReplyText = NewanswerReplyText.replace('\r', '')

                answerReplyTimeSelector = Selector(text=answerHtml)
                answerReplyTime = answerReplyTimeSelector.css(
                    '.ClapLv1UserInfo_Chie-UserInfo__DateSmall__3erUK *::text').get()

                answerReplyUserInfoSelector = Selector(text=answerHtml)
                answerUserReplyUrl = answerReplyUserInfoSelector.css(
                    '.ClapLv2ReplyItem_Chie-ReplyItem__ItemHead__HrG5K a::attr(href)').get()
                answerUserReplyInfo = self.GetUserInfo2(answerUserReplyUrl)
                answerUserReplyInfoExp = {'Loginnumber': answerUserReplyInfo[0],
                                          'UserName': answerUserReplyInfo[1]}  # 对用户信息进行解析
                reply['ReplyText'] = NewanswerReplyText
                reply['ReplyTime'] = answerReplyTime
                reply['ReplyUserInfo'] = answerUserReplyInfoExp
                replys.append(reply)

        answer['AnswerText'] = NewanswerText
        answer['AnswerTime'] = answerTime
        answer['AnswerUserInfo'] = {'Loginnumber': answerUserInfo[0],
                                    'UserName': answerUserInfo[1]}

        answer['AnswerApprove'] = answerApprove
        answer['AnswerReply'] = replys
        # print(answer)
        if answerApprove == None:
            answer['AnswerApprove'] = 0
        return answer

    def GetAnswersInfo(self, answerUrl):
        '''
        :param answerUrl: 问题的地址
        :return: 所有回答的内容,时间,用户账户信息,点赞数
        '''

        answers = []
        answerHtml = self.GetHtml(answerUrl)
        answerTextSelector = Selector(text=answerHtml)
        answerHtml = answerTextSelector.css('#ba .ClapLv3BestAnswer_Chie-BestAnswer__1kJ7F').get()
        # print(answerHtml)
        baAnswerInfo = self.GetAnswerInfo(answerHtml)
        try:
            baAnswerInfo['AnswerText'] = baAnswerInfo['AnswerText'][7:]

        except Exception as e:
            print(e)
        answers.append(baAnswerInfo)
        # print(baAnswerInfo)
        pageReply = (int(self.GetOtherAnswerNumber(answerUrl)) - 1) // 5 + 1  # 多页访问回复

        for i in range(1, pageReply + 1):
            NewPageReplyUrl = answerUrl + '?sort=1&page=' + str(i)
            questionHtml = self.GetHtml(NewPageReplyUrl)
            AnswerSelector = Selector(text=questionHtml)
            AnswerItems = AnswerSelector.css('#ans .ClapLv3AnswerList_Chie-AnswerList__Item__2PxD4').getall()
            # print(AnswerItems)
            for answerHtml in AnswerItems:  # 切割出当前页全部回答
                answerInfo = self.GetAnswerInfo(answerHtml)
                answers.append(answerInfo)
        return answers

    def GetQuestionMessage(self, questionUrl):
        questionInformation = []  # 将解析出来的信息,按用户名,时间,回答依次存储
        try:
            # 1
            questionHtml = self.GetHtml(questionUrl)

            self.writeHtml('./test12.txt', questionHtml)
            # 2
            authorNameSelector = Selector(text=questionHtml)
            authorUrl = authorNameSelector.css(
                '#que .ClapLv2QuestionItem_Chie-QuestionItem__Head__1ZglB a::attr(href)').get()
            authorInfo = self.GetUserInfo(authorUrl)
            authorInfo = {'Loginnumber': authorInfo[0], 'UserName': authorInfo[1]}

            authorTimeSelector = Selector(text=questionHtml)
            authorTimeItems = authorTimeSelector.re('itemprop="dateCreated".*?>(.*?)</')

            authorQuestionSelector = Selector(text=questionHtml)
            authorQuestionItems = authorQuestionSelector.css(
                '#que .ClapLv1TextBlock_Chie-TextBlock__3X4V5 *::text').getall()
            queText = authorQuestionItems[0]
            queGlan = authorQuestionItems[-2]  # 问题浏览量

            authorQuestionlabelSelector = Selector(text=questionHtml)  # 问题标签
            authorQuestionLabelItems = authorQuestionlabelSelector.css(
                '#que .ClapLv2QuestionItem_Chie-QuestionItem__SubAnchor__2Pv8w *::text').getall()

            authorThankSelector = Selector(text=questionHtml)  # 问题标签
            authorThankItems = authorThankSelector.css(
                '.ClapLv3BestAnswer_Chie-BestAnswer__Thanks__1ASeS *::text').getall()

            awardMoneyQuestionSelector = Selector(text=questionHtml)
            awardMoneyQuestionItems = awardMoneyQuestionSelector.css(
                '#que .ClapLv2QuestionItem_Chie-QuestionItem__SubChieCoin__2akxj *::text').getall()
            awardMoney = 0

            if len(awardMoneyQuestionItems) == 2:
                awardMoney = awardMoneyQuestionItems[1]

            baAnswerSelector = Selector(text=questionHtml)
            baAnswerSelectorItems = baAnswerSelector.css('#ba .ClapLv1TextBlock_Chie-TextBlock__3X4V5 *::text').getall()

            texts = []
            texts.append(queText)

            if 0 < len(baAnswerSelectorItems) <= 2:  # 特判最佳回答是否存在,以及特判是否有提问者感谢内容
                bestAnswer = baAnswerSelectorItems[1]
                texts.append(bestAnswer)
            else:
                bestAnswer = baAnswerSelectorItems[1]
                thanksAnswer = baAnswerSelectorItems[-3]
                thanksAnswerTime = baAnswerSelectorItems[-1]
                authorTimeItems.append(thanksAnswerTime)
                texts.append(bestAnswer)
                texts.append(thanksAnswer)

            anotherAnswerSelector = Selector(text=questionHtml)
            anotherAnswerItems = anotherAnswerSelector.css('#ans .ClapLv1TextBlock_Chie-TextBlock__3X4V5').getall()

            for anotherAnswer in anotherAnswerItems:
                texts.append(anotherAnswer)

            questionInformation.append(texts[0])  # 问题内容
            questionInformation.append(authorQuestionLabelItems)  # 问题标签
            questionInformation.append(authorTimeItems[0])  # 提问时间
            questionInformation.append(authorInfo)  # 提问者账户信息
            questionInformation.append(questionUrl)  # 问题URL
            questionInformation.append(awardMoney)  # 悬赏数

            answers = self.GetAnswersInfo(questionUrl)  # 回答

            if len(authorThankItems) > 4:  # 特判用户答谢部分是否存在
                ThansInfo = {}
                ThansInfo['AnswerText'] = authorThankItems[2].replace('\n', '')
                ThansInfo['AnswerTime'] = authorThankItems[4]
                ThansInfo['AnswerUserInfo'] = authorInfo
                ThansInfo['AnswerApprove'] = 0
                answers.append(ThansInfo)

            questionInformation.append(answers)  # 加入用户回答

            return questionInformation
        except Exception as e:
            # print(e,'bbbb')
            return []

    def WriteInCsv(self, csvSaveUrl, questionInformation):
        dataList = []
        for question, user, time, text in zip(*questionInformation):
            if user == '' or time == '' or text == '':  # 丢弃脏数据
                continue
            simpleInfomation = {}
            simpleInfomation['Question'] = question
            simpleInfomation['Time'] = time
            simpleInfomation['UserName'] = user
            simpleInfomation['Text'] = text
            dataList.append(simpleInfomation)

        fieldnames = ['Question', 'Time', 'UserName', 'Text']
        with open(csvSaveUrl, 'a+', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writerows(dataList)

    def WriteJson(self, jsonUrl, questionInformation):
        fieldnames = ['QuestionText', 'QuestionLabel', 'QuestionTime', 'QuestionerInformation', 'QuestionUrl',
                      'OfferReward', 'Answers']
        messageItem = {}
        try:
            if len(fieldnames) == len(questionInformation):
                for i in range(len(fieldnames)):
                    messageItem[fieldnames[i]] = questionInformation[i]
                json.dump(messageItem, open(jsonUrl, 'a+', encoding='utf-8'), indent=2, ensure_ascii=False)
        except:
            pass

    def GetCateName(self, categoryUrl):
        categoryHtml = self.GetHtml(categoryUrl)
        cateNameSelector = Selector(text=categoryHtml)
        cateName = cateNameSelector.css('.ClapLv2Title_Chie-Title__TextWrapper__1ccaf h1::text').get()
        return cateName

    def GetOtherAnswerNumber(self, answerUrl):
        '''
        :param answerUrl: 问题网页的资源定位符
        :return:该问题下的其他回复数
        '''
        answerHtml = self.GetHtml(answerUrl)
        answerAnswerSelector = Selector(text=answerHtml)
        answerNumber = answerAnswerSelector.css(
            '.ClapLv2QuestionItem_Chie-QuestionItem__AnswerNumber__3_0RR *::text').get()
        if answerNumber == None:  # 特判无法找到的情况
            answerNumber = 0
        return answerNumber

def main():
    WBSearch = WisdomBagSearch()
    workPath = './Plato44'
    if not os.path.exists(workPath):
        os.mkdir(workPath)
    firstLayerUrl = WBSearch.MainUrl + "/category"
    categoryHrefS = WBSearch.GetCategory(firstLayerUrl)  # 获取主页面下全部类别的href
    secondLayerUrls = WBSearch.preHanle_categoryHrefS(categoryHrefS)  # 对全部类别进行预处理成可以直接跳转的URL
    print("--启动YAHOO知惠网网络爬虫--")

    for secondLayerUrl in secondLayerUrls:
        print(secondLayerUrl)
        questionUrls = WBSearch.GetQuestionsUrls(secondLayerUrl)  # 记录一个类别其中前100页的所有能访问的问题URL

        categoryName = WBSearch.GetCateName(secondLayerUrl)
        print('正在爬取类别:' + categoryName)

        with open(os.path.join(workPath, categoryName + ".json"), 'w', encoding='utf-8') as w:
            s = 1  # 创建json文件

        for questionUrl in questionUrls:
            print(questionUrl)
            try:
                time.sleep(random.random())
                questionInformation = WBSearch.GetQuestionMessage(
                    questionUrl)  # 对每个问题下的html进行解析,提取出提问者名字,时间,内容。回答者们的名字,时间,内容
                # WriteInCsv(secondLayerAddess,questionInformation)
                print(questionInformation)
                if questionInformation == 0:
                    continue

            except Exception as e:
                print(type(e), e)

            jsonUrl = os.path.join(workPath, categoryName + ".json")
            if questionInformation != 0:
                if WBSearch.CheckQuestionStandard(questionInformation):
                    WBSearch.WriteJson(jsonUrl, questionInformation)

def Search(Url):
    WBSearch = WisdomBagSearch()
    workPath = './Plato43'
    secondLayerUrl = Url  # 记录一个类别其中前100页的所有能访问的问题URL
    categoryName = WBSearch.GetCateName(secondLayerUrl)
    print(categoryName)
    if not os.path.exists(workPath):
        os.mkdir(workPath)
    print(secondLayerUrl)
    questionUrls = WBSearch.GetQuestionsUrls(secondLayerUrl)  # 记录一个类别其中前100页的所有能访问的问题URL

    categoryName = WBSearch.GetCateName(secondLayerUrl)
    print('正在爬取类别:' + categoryName)

    with open(os.path.join(workPath, categoryName + ".json"), 'w', encoding='utf-8') as w:
        s = 1  # 创建json文件

    for questionUrl in questionUrls:
        print(questionUrl)
        # test
        try:
            time.sleep(random.random())
            questionInformation = WBSearch.GetQuestionMessage(questionUrl)  # 对每个问题下的html进行解析,提取出提问者名字,时间,内容。回答者们的名字,时间,内容
            # WriteInCsv(secondLayerAddess,questionInformation)
            print(questionInformation)
            if questionInformation == 0:
                continue

        except Exception as e:
            print(type(e), e)

        jsonUrl = os.path.join(workPath, categoryName + ".json")
        if questionInformation != 0:
            if WBSearch.CheckQuestionStandard(questionInformation):
                WBSearch.WriteJson(jsonUrl, questionInformation)

if __name__ == "__main__":
    main()
    Url = "https://chiebukuro.yahoo.co.jp/category/2078675272/question/list?flg=1"
    #Search(Url)
相关文章
|
15天前
|
存储 数据采集 人工智能
Python编程入门:从零基础到实战应用
本文是一篇面向初学者的Python编程教程,旨在帮助读者从零开始学习Python编程语言。文章首先介绍了Python的基本概念和特点,然后通过一个简单的例子展示了如何编写Python代码。接下来,文章详细介绍了Python的数据类型、变量、运算符、控制结构、函数等基本语法知识。最后,文章通过一个实战项目——制作一个简单的计算器程序,帮助读者巩固所学知识并提高编程技能。
|
1月前
|
数据采集 存储 JSON
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第27天】本文介绍了Python网络爬虫Scrapy框架的实战应用与技巧。首先讲解了如何创建Scrapy项目、定义爬虫、处理JSON响应、设置User-Agent和代理,以及存储爬取的数据。通过具体示例,帮助读者掌握Scrapy的核心功能和使用方法,提升数据采集效率。
105 6
|
15天前
|
小程序 开发者 Python
探索Python编程:从基础到实战
本文将引导你走进Python编程的世界,从基础语法开始,逐步深入到实战项目。我们将一起探讨如何在编程中发挥创意,解决问题,并分享一些实用的技巧和心得。无论你是编程新手还是有一定经验的开发者,这篇文章都将为你提供有价值的参考。让我们一起开启Python编程的探索之旅吧!
41 10
|
1月前
|
设计模式 前端开发 数据库
Python Web开发:Django框架下的全栈开发实战
【10月更文挑战第27天】本文介绍了Django框架在Python Web开发中的应用,涵盖了Django与Flask等框架的比较、项目结构、模型、视图、模板和URL配置等内容,并展示了实际代码示例,帮助读者快速掌握Django全栈开发的核心技术。
171 45
|
27天前
|
算法 Unix 数据库
Python编程入门:从基础到实战
本篇文章将带你进入Python编程的奇妙世界。我们将从最基础的概念开始,逐步深入,最后通过一个实际的项目案例,让你真正体验到Python编程的乐趣和实用性。无论你是编程新手,还是有一定基础的开发者,这篇文章都将为你提供有价值的信息和知识。让我们一起探索Python的世界吧!
|
28天前
|
并行计算 调度 开发者
探索Python中的异步编程:从基础到实战
在Python的世界里,异步编程是一种让程序运行更加高效、响应更快的技术。本文不仅会介绍异步编程的基本概念和原理,还将通过具体代码示例展示如何在Python中实现异步操作。无论你是初学者还是有经验的开发者,都能从中获益,了解如何运用这一技术优化你的项目。
|
28天前
|
数据处理 Python
探索Python中的异步编程:从基础到实战
在Python的世界中,“速度”不仅是赛车手的追求。本文将带你领略Python异步编程的魅力,从原理到实践,我们不单单是看代码,更通过实例感受它的威力。你将学会如何用更少的服务器资源做更多的事,就像是在厨房里同时烹饪多道菜而不让任何一道烧焦。准备好了吗?让我们开始这场技术烹饪之旅。
|
1月前
|
机器学习/深度学习 数据采集 数据可视化
Python数据科学实战:从Pandas到机器学习
Python数据科学实战:从Pandas到机器学习
|
1月前
|
数据采集 机器学习/深度学习 人工智能
Python编程入门:从基础到实战
【10月更文挑战第36天】本文将带你走进Python的世界,从基础语法出发,逐步深入到实际项目应用。我们将一起探索Python的简洁与强大,通过实例学习如何运用Python解决问题。无论你是编程新手还是希望扩展技能的老手,这篇文章都将为你提供有价值的指导和灵感。让我们一起开启Python编程之旅,用代码书写想法,创造可能。
|
1月前
|
数据库 Python
异步编程不再难!Python asyncio库实战,让你的代码流畅如丝!
在编程中,随着应用复杂度的提升,对并发和异步处理的需求日益增长。Python的asyncio库通过async和await关键字,简化了异步编程,使其变得流畅高效。本文将通过实战示例,介绍异步编程的基本概念、如何使用asyncio编写异步代码以及处理多个异步任务的方法,帮助你掌握异步编程技巧,提高代码性能。
70 4
下一篇
DataWorks