pyppeteer如何使用隧道代理

简介: pyppeteer如何使用隧道代理

之前的文章中我们分享了很多Selenium的使用知识,它功能的确非常强大,但Selenium 也不是完美的,实际使用中有些地方还是不方便,比如环境的配置,得安装好相关浏览器,比如 Chrome、Firefox 等等,然后还要到官方网站去下载对应的驱动,最重要的还需要安装对应的 Python Selenium 库,而且版本也得好好看看是否对应,确实不是很方便,另外如果要做大规模部署的话,环境配置的一些问题也是个头疼的事情。
所以今天我们就给大家介绍一个类似的替代品,叫作 Pyppeteer,那Pyppeteer 又是什么呢?它实际上是 Puppeteer 的 Python 版本的实现,但它不是 Google 开发的,是一位来自于日本的工程师依据 Puppeteer 的一些功能开发出来的非官方版本。
Pyppeteer 是依赖于 Chromium 这个浏览器来运行的。那么有了 Pyppeteer 之后,我们就可以免去那些烦琐的环境配置等问题。如果第一次运行的时候,Chromium 浏览器没有安装,那么程序会帮我们自动安装和配置,就免去了烦琐的环境配置等工作。另外 Pyppeteer 是基于 Python 的新特性 async 实现的,所以它的一些执行也支持异步操作,效率相对于 Selenium 来说也提高了。
那么下面就让我们来一起了解下 Pyppeteer 的相关用法吧。
代理写法,亿牛云代理隧道代码
```#! -- encoding:utf-8 --

import requests
import random

# 要访问的目标页面
targetUrl = "http://httpbin.org/ip"

# 要访问的目标HTTPS页面
# targetUrl = "https://httpbin.org/ip"

# 代理服务器(产品官网 www.16yun.cn)
proxyHost = "t.16yun.cn"
proxyPort = "31111"

# 代理验证信息
proxyUser = "username"
proxyPass = "password"

```

pyppeteer使用隧道代理demo

``` #! -- encoding:utf-8 --
import websockets
from scrapy.http import HtmlResponse
from logging import getLogger
import asyncio
import pyppeteer
import logging
from concurrent.futures._base import TimeoutError
import base64
import sys
import random

pyppeteer_level = logging.WARNING
logging.getLogger('websockets.protocol').setLevel(pyppeteer_level)
logging.getLogger('pyppeteer').setLevel(pyppeteer_level)

PY3 = sys.version_info[0] >= 3


def base64ify(bytes_or_str):
    if PY3 and isinstance(bytes_or_str, str):
        input_bytes = bytes_or_str.encode('utf8')
    else:
        input_bytes = bytes_or_str

    output_bytes = base64.urlsafe_b64encode(input_bytes)
    if PY3:
        return output_bytes.decode('ascii')
    else:
        return output_bytes


class ProxyMiddleware(object):
    # 加载随机UserAgent(根据需求)
    # USER_AGENT = open('useragents.txt').readlines()

    def process_request(self, request, spider):
        # 代理服务器
        proxyHost = "t.16yun.cn"
        proxyPort = "31111"

        # 代理验证信息
        proxyUser = "username"
        proxyPass = "password"

        request.meta['proxy'] = "http://{0}:{1}".format(proxyHost, proxyPort)

        # 添加验证头
        encoded_user_pass = base64ify(proxyUser + ":" + proxyPass)
        request.headers['Proxy-Authorization'] = 'Basic ' + encoded_user_pass

        # 设置IP切换头(根据需求)
        tunnel = random.randint(1, 10000)
        request.headers['Proxy-Tunnel'] = str(tunnel)

        # 设置随机UserAgent(根据需求)
        # request.headers['User-Agent'] = random.choice(self.USER_AGENT)

class PyppeteerMiddleware(object):
    def __init__(self, **args):
        """
        init logger, loop, browser
        :param args:
        """
        self.logger = getLogger(__name__)
        self.loop = asyncio.get_event_loop()
        self.browser = self.loop.run_until_complete(
            pyppeteer.launch(headless=True))
        self.args = args

    def __del__(self):
        """
        close loop
        :return:
        """
        self.loop.close()

    def render(self, url, retries=1, script=None, wait=0.3, scrolldown=False, sleep=0,
               timeout=8.0, keep_page=False):
        """
        render page with pyppeteer
        :param url: page url
        :param retries: max retry times
        :param script: js script to evaluate
        :param wait: number of seconds to wait before loading the page, preventing timeouts
        :param scrolldown: how many times to page down
        :param sleep: how many long to sleep after initial render
        :param timeout: the longest wait time, otherwise raise timeout error
        :param keep_page: keep page not to be closed, browser object needed
        :param browser: pyppetter browser object
        :param with_result: return with js evaluation result
        :return: content, [result]
        """

        # define async render
        async def async_render(url, script, scrolldown, sleep, wait, timeout, keep_page):
            try:
                # basic render
                page = await self.browser.newPage()
                await asyncio.sleep(wait)
                response = await page.goto(url, options={'timeout': int(timeout * 1000)})
                if response.status != 200:
                    return None, None, response.status
                result = None
                # evaluate with script
                if script:
                    result = await page.evaluate(script)

                # scroll down for {scrolldown} times
                if scrolldown:
                    for _ in range(scrolldown):
                        await page._keyboard.down('PageDown')
                        await asyncio.sleep(sleep)
                else:
                    await asyncio.sleep(sleep)
                if scrolldown:
                    await page._keyboard.up('PageDown')

                # get html of page
                content = await page.content()

                return content, result, response.status
            except TimeoutError:
                return None, None, 500
            finally:
                # if keep page, do not close it
                if not keep_page:
                    await page.close()

        content, result, status = [None] * 3

        # retry for {retries} times
        for i in range(retries):
            if not content:
                content, result, status = self.loop.run_until_complete(
                    async_render(url=url, script=script, sleep=sleep, wait=wait,
                                 scrolldown=scrolldown, timeout=timeout, keep_page=keep_page))
            else:
                break

        # if need to return js evaluation result
        return content, result, status

    def process_request(self, request, spider):
        """
        :param request: request object
        :param spider: spider object
        :return: HtmlResponse
        """
        if request.meta.get('render'):
            try:
                self.logger.debug('rendering %s', request.url)
                html, result, status = self.render(request.url)
                return HtmlResponse(url=request.url, body=html, request=request, encoding='utf-8',
                                    status=status)
            except websockets.exceptions.ConnectionClosed:
                pass

    @classmethod
    def from_crawler(cls, crawler):
        return cls(**crawler.settings.get('PYPPETEER_ARGS', {}))
相关文章
|
1月前
|
自然语言处理 计算机视觉 Python
SoccerNet 2025挑战赛:赛题整理(一)
介绍SoccerNet数据集和SoccerNet 2025挑战赛,并梳理SoccerNet 2025挑战赛中的赛题一
226 96
|
关系型数据库 MySQL 数据处理
FlinkCDC 增量读取 binlog 数据比较慢
FlinkCDC 增量读取 binlog 数据比较慢
696 1
|
29天前
|
机器学习/深度学习 人工智能 前端开发
通义DeepResearch全面开源!同步分享可落地的高阶Agent构建方法论
通义研究团队开源发布通义 DeepResearch —— 首个在性能上可与 OpenAI DeepResearch 相媲美、并在多项权威基准测试中取得领先表现的全开源 Web Agent。
1830 89
|
数据采集 Web App开发 JavaScript
Puppeteer动态代理实战:提升数据抓取效率
使用Puppeteer进行网页抓取时,通过动态代理提高效率。配置代理服务器如亿牛云,结合`puppeteer.launch`设置代理参数。导航至目标网页,等待图片加载,然后抓取并下载图片资源。代理有助于避开反爬策略,确保数据抓取的稳定性和效率。
318 4
Puppeteer动态代理实战:提升数据抓取效率
|
PHP 数据库
laravel表单验证的exists、unique去除软删除字段的校验
虽然Laravel的验证系统非常强大和灵活,但在处理软删除数据时仍需要特别注意。通过正确使用验证规则,并在需要时创建自定义验证规则,你可以确保应用的数据验证既准确又高效。记得在对 `unique`和 `exists`规则进程自定义时,清晰地注明你排除软删除记录的意图,这将使得代码更容易理解和维护。
336 4
|
Python
丢弃 Tkinter!几行代码快速生成漂亮 GUI!
Python 的 GUI 框架并不少,其中 Tkinter,wxPython,Qt 和 Kivy 是几种比较主流的框架。此外,还有不少在上述框架基础上封装的简化框架,例如 EasyGUI,PyGUI 和 Pyforms 等。
585 0
丢弃 Tkinter!几行代码快速生成漂亮 GUI!
|
数据采集 数据可视化 Python
解析python爬取Ebay数据的方式
解析python爬取Ebay数据的方式
|
iOS开发 MacOS Windows
比微信文件传输助手更好用的传输工具|Telegram
比微信文件传输助手更好用的传输工具|Telegram
|
算法 网络安全 数据安全/隐私保护
|
SQL 关系型数据库 MySQL
MySQL 删除表中的数据记录
前言 删除数据记录是数据操作中常见的操作,可以删除表中已经存在的数据记录。在MySQL中可以通过DELETE语句来删除数据记录,该SQL语句可以通过以下几种方式使用:删除特定数据记录、删除所有数据记录。