下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:7765
以上代码实现了一个基础的淘宝数据采集框架,包含爬虫核心、数据处理和代理管理三个模块。请注意这只是一个技术演示,仅供大家学习学习就可以了,不要用于其他用途哈。
import requests
import re
import time
import random
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from urllib.parse import urlencode
class TaobaoSpider:
def init(self):
self.session = requests.Session()
self.ua = UserAgent()
self.headers = {
'User-Agent': self.ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,/;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Referer': 'https://www.taobao.com/',
'Upgrade-Insecure-Requests': '1'
}
self.cookies = {}
self.proxies = {}
self.timeout = 10
self.retry_times = 3
self.delay = random.uniform(1, 3)
def get_html(self, url):
for _ in range(self.retry_times):
try:
time.sleep(self.delay)
response = self.session.get(
url,
headers=self.headers,
cookies=self.cookies,
proxies=self.proxies,
timeout=self.timeout
)
if response.status_code == 200:
return response.text
else:
print(f"请求失败,状态码:{response.status_code}")
except Exception as e:
print(f"请求异常:{str(e)}")
return None
def parse_shop_info(self, html):
shop_info = {}
soup = BeautifulSoup(html, 'html.parser')
# 解析店铺信息
shop_name_tag = soup.find('div', class_='shop-name')
if shop_name_tag:
shop_info['name'] = shop_name_tag.get_text(strip=True)
# 解析旺旺信息
wangwang_tag = soup.find('a', class_='ww-light')
if wangwang_tag:
shop_info['wangwang'] = wangwang_tag.get('data-nick')
return shop_info
def search_shops(self, keyword, page=1):
base_url = 'https://s.taobao.com/search'
params = {
'q': keyword,
's': (page - 1) * 44,
'ie': 'utf8'
}
url = f"{base_url}?{urlencode(params)}"
html = self.get_html(url)
if html:
shop_links = re.findall(r'//shop(\d+)\.taobao\.com', html)
return list(set(shop_links))
return []
def run(self, keyword, max_page=1):
all_shops = []
for page in range(1, max_page + 1):
print(f"正在采集第{page}页...")
shop_ids = self.search_shops(keyword, page)
for shop_id in shop_ids:
shop_url = f"https://shop{shop_id}.taobao.com"
html = self.get_html(shop_url)
if html:
shop_info = self.parse_shop_info(html)
shop_info['url'] = shop_url
all_shops.append(shop_info)
print(f"采集到店铺:{shop_info.get('name', '未知')}")
return all_shops
if name == 'main':
spider = TaobaoSpider()
keyword = input("请输入搜索关键词:")
max_page = int(input("请输入采集页数:"))
result = spider.run(keyword, max_page)
print(f"共采集到{len(result)}个店铺信息")
json
import csv
import pandas as pd
from datetime import datetime
class DataProcessor:
@staticmethod
def save_to_json(data, filename):
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
@staticmethod
def save_to_csv(data, filename):
if not data:
return
keys = data[0].keys()
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=keys)
writer.writeheader()
writer.writerows(data)
@staticmethod
def save_to_excel(data, filename):
if not data:
return
df = pd.DataFrame(data)
df.to_excel(filename, index=False, encoding='utf-8')
@staticmethod
def generate_report(data):
report = {
'total_count': len(data),
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'sample_data': data[:3] if len(data) > 3 else data
}
return report
@staticmethod
def filter_data(data, condition_func):
return [item for item in data if condition_func(item)]
import requests
from threading import Lock
class ProxyManager:
def init(self):
self.proxy_list = []
self.current_index = 0
self.lock = Lock()
def load_proxies(self, api_url=None, file_path=None):
if api_url:
try:
response = requests.get(api_url)
if response.status_code == 200:
self.proxy_list = response.text.split('\n')
except Exception as e:
print(f"从API加载代理失败: {str(e)}")
if file_path:
try:
with open(file_path, 'r') as f:
self.proxy_list = [line.strip() for line in f if line.strip()]
except Exception as e:
print(f"从文件加载代理失败: {str(e)}")
def get_proxy(self):
with self.lock:
if not self.proxy_list:
return None
proxy = self.proxy_list[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxy_list)
return {
'http': f'http://{proxy}',
'https': f'https://{proxy}'
}
def remove_proxy(self, proxy):
with self.lock:
if proxy in self.proxy_list:
self.proxy_list.remove(proxy)
def check_proxy(self, proxy, test_url='https://www.taobao.com'):
try:
response = requests.get(
test_url,
proxies={'http': proxy, 'https': proxy},
timeout=10
)
return response.status_code == 200
except:
return False