理论知识掌握后,让我们通过三个完整的实战案例来巩固BeautifulSoup的使用技巧。本章将实现豆瓣电影Top250抓取、新闻网站内容提取和电商商品信息抓取,涵盖常见的数据抓取场景。
这是最经典的网页抓取案例之一,我们将完整抓取豆瓣电影Top250的所有信息。
首先分析豆瓣电影Top250页面的HTML结构:
https://movie.douban.com/top250?start=0(分页参数:start)<div class="item">中import requests
from bs4 import BeautifulSoup
import time
import csv
import re
class DoubanMovieScraper:
"""豆瓣电影Top250抓取器"""
def __init__(self):
self.base_url = "https://movie.douban.com/top250"
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
}
self.movies = []
def get_page(self, page_num):
"""获取指定页码的页面内容"""
start = (page_num - 1) * 25
url = f"{self.base_url}?start={start}"
try:
print(f"正在获取第 {page_num} 页...")
response = requests.get(url, headers=self.headers, timeout=10)
response.raise_for_status()
# 设置编码
response.encoding = 'utf-8'
return response.text
except Exception as e:
print(f"获取第 {page_num} 页失败: {e}")
return None
def parse_movie_item(self, item):
"""解析单个电影项目"""
movie = {}
# 获取排名
rank_elem = item.find('em')
movie['rank'] = rank_elem.text if rank_elem else 'N/A'
# 获取电影标题(中文和英文)
title_elem = item.find('span', class_='title')
if title_elem:
movie['title'] = title_elem.text.strip()
# 获取英文标题
other_title_elem = item.find('span', class_='other')
if other_title_elem:
movie['other_title'] = other_title_elem.text.strip()
# 获取评分
rating_elem = item.find('span', class_='rating_num')
movie['rating'] = rating_elem.text if rating_elem else 'N/A'
# 获取评价人数
rating_count_elem = item.find('div', class_='star').find_all('span')[-1]
if rating_count_elem:
rating_count_text = rating_count_elem.text
# 提取数字
numbers = re.findall(r'\d+', rating_count_text)
movie['rating_count'] = numbers[0] if numbers else '0'
# 获取导演、年份、地区、类型等信息
info_elem = item.find('div', class_='bd').find('p')
if info_elem:
info_text = info_elem.text.strip()
# 分割信息行
lines = info_text.split('\n')
if len(lines) >= 2:
# 第一行:导演和主演
director_line = lines[0].strip()
movie['director'] = director_line.replace('导演:', '').strip()
# 第二行:年份/地区/类型
details_line = lines[1].strip()
parts = details_line.split('/')
if len(parts) >= 3:
movie['year'] = parts[0].strip()
movie['region'] = parts[1].strip()
movie['genre'] = parts[2].strip()
# 获取简介
quote_elem = item.find('span', class_='inq')
movie['quote'] = quote_elem.text if quote_elem else 'N/A'
# 获取电影链接
link_elem = item.find('a')
movie['link'] = link_elem['href'] if link_elem and link_elem.has_attr('href') else ''
# 获取图片链接
img_elem = item.find('img')
movie['image_url'] = img_elem['src'] if img_elem and img_elem.has_attr('src') else ''
return movie
def parse_page(self, html_content):
"""解析页面内容,提取电影信息"""
if not html_content:
return []
soup = BeautifulSoup(html_content, 'lxml')
movie_items = soup.find_all('div', class_='item')
movies_on_page = []
for item in movie_items:
movie = self.parse_movie_item(item)
if movie:
movies_on_page.append(movie)
return movies_on_page
def scrape_all_pages(self, max_pages=10):
"""抓取所有页面"""
print("开始抓取豆瓣电影Top250...")
for page_num in range(1, max_pages + 1):
# 获取页面
html = self.get_page(page_num)
if not html:
continue
# 解析页面
movies = self.parse_page(html)
self.movies.extend(movies)
print(f"第 {page_num} 页完成,找到 {len(movies)} 部电影")
# 礼貌性延迟,避免请求过快
time.sleep(2)
print(f"\n抓取完成!总共找到 {len(self.movies)} 部电影")
return self.movies
def save_to_csv(self, filename='douban_top250.csv'):
"""保存数据到CSV文件"""
if not self.movies:
print("没有数据可保存")
return
# 定义CSV字段
fieldnames = [
'rank', 'title', 'other_title', 'rating', 'rating_count',
'director', 'year', 'region', 'genre', 'quote', 'link', 'image_url'
]
try:
with open(filename, 'w', newline='', encoding='utf-8-sig') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for movie in self.movies:
writer.writerow(movie)
print(f"数据已保存到 {filename}")
return True
except Exception as e:
print(f"保存CSV文件失败: {e}")
return False
def save_to_json(self, filename='douban_top250.json'):
"""保存数据到JSON文件"""
import json
if not self.movies:
print("没有数据可保存")
return
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.movies, f, ensure_ascii=False, indent=2)
print(f"数据已保存到 {filename}")
return True
except Exception as e:
print(f"保存JSON文件失败: {e}")
return False
def analyze_data(self):
"""数据分析示例"""
if not self.movies:
print("没有数据可分析")
return
print("\n=== 数据分析 ===")
# 统计每年的电影数量
year_counts = {}
for movie in self.movies:
year = movie.get('year', '未知')
if year in year_counts:
year_counts[year] += 1
else:
year_counts[year] = 1
# 找出产量最高的年份
top_years = sorted(year_counts.items(), key=lambda x: x[1], reverse=True)[:5]
print("电影数量最多的年份(前5名):")
for year, count in top_years:
print(f" {year}: {count} 部")
# 统计导演作品数量
director_counts = {}
for movie in self.movies:
director = movie.get('director', '').split(' ')[0] # 只取第一位导演
if director and director != 'N/A':
if director in director_counts:
director_counts[director] += 1
else:
director_counts[director] = 1
# 找出作品最多的导演
top_directors = sorted(director_counts.items(), key=lambda x: x[1], reverse=True)[:5]
print("\n作品最多的导演(前5名):")
for director, count in top_directors:
print(f" {director}: {count} 部")
# 平均评分
total_rating = 0
count = 0
for movie in self.movies:
rating = movie.get('rating', '0')
if rating != 'N/A':
try:
total_rating += float(rating)
count += 1
except:
pass
if count > 0:
avg_rating = total_rating / count
print(f"\nTop250平均评分: {avg_rating:.2f}")
# 评分分布
rating_dist = {
'9分以上': 0,
'8-9分': 0,
'7-8分': 0,
'7分以下': 0
}
for movie in self.movies:
rating = movie.get('rating', '0')
if rating != 'N/A':
try:
rating_float = float(rating)
if rating_float >= 9:
rating_dist['9分以上'] += 1
elif rating_float >= 8:
rating_dist['8-9分'] += 1
elif rating_float >= 7:
rating_dist['7-8分'] += 1
else:
rating_dist['7分以下'] += 1
except:
pass
print("\n评分分布:")
for category, count in rating_dist.items():
print(f" {category}: {count} 部")
# 使用示例
if __name__ == "__main__":
scraper = DoubanMovieScraper()
# 抓取数据(这里只抓取前2页作为示例,完整抓取请设置为10页)
movies = scraper.scrape_all_pages(max_pages=2)
if movies:
# 保存数据
scraper.save_to_csv('douban_top250_sample.csv')
scraper.save_to_json('douban_top250_sample.json')
# 数据分析
scraper.analyze_data()
# 显示前3部电影信息
print("\n=== 前3部电影信息 ===")
for i, movie in enumerate(movies[:3], 1):
print(f"\n{i}. {movie.get('title', 'N/A')}")
print(f" 排名: {movie.get('rank', 'N/A')}")
print(f" 评分: {movie.get('rating', 'N/A')} ({movie.get('rating_count', '0')}人评价)")
print(f" 导演: {movie.get('director', 'N/A')}")
print(f" 年份/地区/类型: {movie.get('year', 'N/A')}/{movie.get('region', 'N/A')}/{movie.get('genre', 'N/A')}")
print(f" 简介: {movie.get('quote', 'N/A')}")
# 1. 使用Session减少连接开销
session = requests.Session()
session.headers.update(headers)
# 2. 实现并发抓取(使用线程池)
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_page_concurrently(page_nums, max_workers=3):
"""并发抓取多页"""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_page = {
executor.submit(scraper.get_page, page_num): page_num
for page_num in page_nums
}
results = []
for future in as_completed(future_to_page):
page_num = future_to_page[future]
try:
html = future.result()
results.append((page_num, html))
except Exception as e:
print(f"第 {page_num} 页抓取失败: {e}")
return results
# 3. 实现断点续传
import json
import os
class ResumeScraper(DoubanMovieScraper):
"""支持断点续传的爬虫"""
def __init__(self, state_file='scraper_state.json'):
super().__init__()
self.state_file = state_file
self.load_state()
def load_state(self):
"""加载爬取状态"""
if os.path.exists(self.state_file):
try:
with open(self.state_file, 'r', encoding='utf-8') as f:
state = json.load(f)
self.movies = state.get('movies', [])
self.last_page = state.get('last_page', 0)
print(f"加载状态:已抓取 {len(self.movies)} 部电影,最后抓取页面:{self.last_page}")
except Exception as e:
print(f"加载状态失败: {e}")
self.movies = []
self.last_page = 0
else:
self.movies = []
self.last_page = 0
def save_state(self):
"""保存爬取状态"""
state = {
'movies': self.movies,
'last_page': self.last_page,
'timestamp': time.time()
}
try:
with open(self.state_file, 'w', encoding='utf-8') as f:
json.dump(state, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存状态失败: {e}")
def scrape_with_resume(self, max_pages=10):
"""支持断点续传的抓取"""
start_page = self.last_page + 1
for page_num in range(start_page, max_pages + 1):
print(f"正在抓取第 {page_num} 页...")
html = self.get_page(page_num)
if not html:
continue
movies_on_page = self.parse_page(html)
self.movies.extend(movies_on_page)
self.last_page = page_num
# 每抓取一页保存一次状态
self.save_state()
print(f"第 {page_num} 页完成,找到 {len(movies_on_page)} 部电影")
time.sleep(2)
print(f"\n抓取完成!总共找到 {len(self.movies)} 部电影")
return self.movies
新闻网站通常有更复杂的结构和更多的动态内容,这个案例将演示如何处理这类网站。
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import time
from datetime import datetime
import json
class NewsScraper:
"""通用新闻网站抓取器"""
def __init__(self, base_url, config=None):
self.base_url = base_url
self.session = requests.Session()
self.articles = []
# 默认配置
self.config = {
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'timeout': 10,
'delay': 1,
'max_articles': 50,
'selectors': {
'article': 'article, .news-item, .post, .story',
'title': 'h1, h2, .title, .headline',
'content': '.content, .article-body, .post-content, .story-content',
'author': '.author, .byline, .writer',
'date': '.date, .time, .published, time',
'category': '.category, .section, .tag',
'summary': '.summary, .excerpt, .description'
}
}
# 合并自定义配置
if config:
self.config.update(config)
# 设置请求头
self.session.headers.update({
'User-Agent': self.config['user_agent'],
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
})
def normalize_url(self, url):
"""规范化URL"""
if not urlparse(url).scheme:
url = urljoin(self.base_url, url)
return url
def fetch_page(self, url):
"""获取页面内容"""
try:
response = self.session.get(url, timeout=self.config['timeout'])
response.raise_for_status()
# 自动检测编码
response.encoding = response.apparent_encoding
return response.text
except Exception as e:
print(f"获取页面失败 {url}: {e}")
return None
def extract_with_selectors(self, soup, selector_key, default=''):
"""使用多种选择器提取内容"""
selectors = self.config['selectors'].get(selector_key, '')
if isinstance(selectors, str):
selectors = [s.strip() for s in selectors.split(',')]
for selector in selectors:
if selector:
element = soup.select_one(selector)
if element:
# 处理特殊元素
if selector == 'time' and element.has_attr('datetime'):
return element['datetime']
return element.get_text(strip=True)
return default
def parse_article_page(self, url):
"""解析文章详情页"""
print(f"正在解析文章: {url}")
html = self.fetch_page(url)
if not html:
return None
soup = BeautifulSoup(html, 'lxml')
article = {
'url': url,
'title': self.extract_with_selectors(soup, 'title'),
'content': self.extract_with_selectors(soup, 'content'),
'author': self.extract_with_selectors(soup, 'author'),
'date': self.extract_with_selectors(soup, 'date'),
'category': self.extract_with_selectors(soup, 'category'),
'summary': self.extract_with_selectors(soup, 'summary'),
'images': [],
'links': [],
'scraped_at': datetime.now().isoformat()
}
# 提取图片
for img in soup.select('img'):
src = img.get('src', '')
if src:
src = self.normalize_url(src)
article['images'].append({
'src': src,
'alt': img.get('alt', ''),
'title': img.get('title', '')
})
# 提取内部链接
for a in soup.select('a[href]'):
href = a.get('href', '')
if href and urlparse(href).netloc == urlparse(self.base_url).netloc:
article['links'].append({
'url': self.normalize_url(href),
'text': a.get_text(strip=True)[:100]
})
# 清理内容
if article['content']:
# 移除多余空白
article['content'] = ' '.join(article['content'].split())
# 限制长度
if len(article['content']) > 5000:
article['content'] = article['content'][:5000] + '...'
return article
def extract_article_links(self, soup):
"""从列表页提取文章链接"""
article_selectors = self.config['selectors']['article']
if isinstance(article_selectors, str):
article_selectors = [s.strip() for s in article_selectors.split(',')]
links = set()
for selector in article_selectors:
if selector:
for element in soup.select(selector):
# 查找链接
link_elem = element.find('a')
if link_elem and link_elem.has_attr('href'):
href = link_elem['href']
if href and not href.startswith('javascript:'):
full_url = self.normalize_url(href)
links.add(full_url)
return list(links)
def scrape_category(self, category_url, max_articles=None):
"""抓取分类页面下的所有文章"""
if max_articles is None:
max_articles = self.config['max_articles']
print(f"开始抓取分类: {category_url}")
# 获取分类页面
html = self.fetch_page(category_url)
if not html:
return []
soup = BeautifulSoup(html, 'lxml')
# 提取文章链接
article_urls = self.extract_article_links(soup)
print(f"找到 {len(article_urls)} 个文章链接")
# 限制抓取数量
article_urls = article_urls[:max_articles]
# 逐个抓取文章
articles = []
for i, url in enumerate(article_urls, 1):
print(f"正在处理第 {i}/{len(article_urls)} 篇文章")
article = self.parse_article_page(url)
if article:
articles.append(article)
# 礼貌性延迟
time.sleep(self.config['delay'])
print(f"分类抓取完成,成功获取 {len(articles)} 篇文章")
return articles
def scrape_multiple_categories(self, category_urls):
"""抓取多个分类"""
all_articles = []
for category_url in category_urls:
articles = self.scrape_category(category_url)
all_articles.extend(articles)
# 避免请求过快
time.sleep(self.config['delay'] * 2)
# 去重(基于URL)
unique_articles = []
seen_urls = set()
for article in all_articles:
if article['url'] not in seen_urls:
seen_urls.add(article['url'])
unique_articles.append(article)
print(f"总共抓取 {len(unique_articles)} 篇唯一文章")
self.articles = unique_articles
return unique_articles
def save_articles(self, filename='news_articles.json'):
"""保存文章数据"""
if not self.articles:
print("没有文章数据可保存")
return False
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.articles, f, ensure_ascii=False, indent=2)
print(f"文章数据已保存到 {filename}")
return True
except Exception as e:
print(f"保存文章数据失败: {e}")
return False
def generate_summary(self):
"""生成抓取摘要"""
if not self.articles:
return "没有数据"
summary = {
'total_articles': len(self.articles),
'categories': set(),
'authors': set(),
'date_range': {
'earliest': None,
'latest': None
}
}
for article in self.articles:
# 收集分类
if article['category']:
summary['categories'].add(article['category'])
# 收集作者
if article['author']:
summary['authors'].add(article['author'])
# 更新日期范围
if article['date']:
try:
date_obj = datetime.fromisoformat(article['date'].replace('Z', '+00:00'))
if not summary['date_range']['earliest'] or date_obj < summary['date_range']['earliest']:
summary['date_range']['earliest'] = date_obj
if not summary['date_range']['latest'] or date_obj > summary['date_range']['latest']:
summary['date_range']['latest'] = date_obj
except:
pass
summary['categories'] = list(summary['categories'])
summary['authors'] = list(summary['authors'])
return summary
# 示例配置和用法
def example_news_scraping():
"""新闻网站抓取示例"""
# 配置抓取器(以示例网站为例)
config = {
'max_articles': 10, # 每个分类最多抓取10篇文章
'delay': 2, # 请求延迟
'selectors': {
'article': '.post, .article-item, .news-item',
'title': 'h1.entry-title, h1.article-title, .title',
'content': '.entry-content, .article-content, .content',
'author': '.author, .byline, .post-author',
'date': '.post-date, .article-date, time',
'category': '.category, .post-category, .section',
'summary': '.entry-summary, .excerpt, .description'
}
}
# 创建抓取器
scraper = NewsScraper(
base_url='https://example-news.com',
config=config
)
# 定义要抓取的分类
categories = [
'https://example-news.com/category/technology',
'https://example-news.com/category/business',
'https://example-news.com/category/science'
]
# 开始抓取
articles = scraper.scrape_multiple_categories(categories)
if articles:
# 保存数据
scraper.save_articles('news_data.json')
# 生成摘要
summary = scraper.generate_summary()
print("\n=== 抓取摘要 ===")
print(f"总文章数: {summary['total_articles']}")
print(f"分类数: {len(summary['categories'])}")
print(f"作者数: {len(summary['authors'])}")
# 显示前3篇文章
print("\n=== 示例文章 ===")
for i, article in enumerate(articles[:3], 1):
print(f"\n{i}. {article['title']}")
print(f" 作者: {article['author'] or '未知'}")
print(f" 日期: {article['date'] or '未知'}")
print(f" 分类: {article['category'] or '未知'}")
print(f" 摘要: {article['summary'][:100] if article['summary'] else '无'}...")
return articles
# 注意:实际使用时,需要根据目标网站的结构调整选择器配置
if __name__ == "__main__":
# 这是一个示例,实际使用时需要替换为真实的URL和选择器
print("这是一个新闻抓取器的示例代码")
print("实际使用时需要根据目标网站的结构调整选择器配置")
# example_news_scraping()
# 对于使用JavaScript动态加载内容的网站,可能需要使用Selenium
# 以下是使用Selenium配合BeautifulSoup的示例
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
import time
class DynamicNewsScraper:
"""处理动态加载内容的新闻抓取器"""
def __init__(self):
# 配置Selenium
options = webdriver.ChromeOptions()
options.add_argument('--headless') # 无头模式
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 10)
def scroll_to_load_content(self, scroll_pause_time=2, max_scrolls=10):
"""滚动页面加载更多内容"""
last_height = self.driver.execute_script("return document.body.scrollHeight")
for _ in range(max_scrolls):
# 滚动到底部
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
# 等待加载
time.sleep(scroll_pause_time)
# 计算新的滚动高度
new_height = self.driver.execute_script("return document.body.scrollHeight")
if new_height == last_height:
break
last_height = new_height
def click_load_more(self, button_selector):
"""点击"加载更多"按钮"""
try:
load_more_button = self.wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, button_selector))
)
load_more_button.click()
time.sleep(2) # 等待内容加载
return True
except:
return False
def scrape_dynamic_news(self, url, max_articles=20):
"""抓取动态加载的新闻"""
print(f"访问动态页面: {url}")
self.driver.get(url)
# 等待页面初始加载
time.sleep(3)
# 滚动加载更多内容
self.scroll_to_load_content()
# 尝试点击"加载更多"按钮(如果有)
load_more_selectors = [
'.load-more', '.more-articles', '.see-more',
'button:contains("更多")', 'a:contains("更多")'
]
for selector in load_more_selectors:
if self.click_load_more(selector):
# 再次滚动
self.scroll_to_load_content()
# 获取页面源码
page_source = self.driver.page_source
# 使用BeautifulSoup解析
soup = BeautifulSoup(page_source, 'lxml')
# 提取文章信息(根据实际网站结构调整)
articles = []
article_elements = soup.select('article, .news-item, .post')
for element in article_elements[:max_articles]:
article = {}
# 提取标题
title_elem = element.select_one('h2, h3, .title, .headline')
if title_elem:
article['title'] = title_elem.get_text(strip=True)
# 提取链接
link_elem = element.find('a')
if link_elem and link_elem.has_attr('href'):
article['url'] = link_elem['href']
# 提取摘要
summary_elem = element.select_one('.summary, .excerpt, .description')
if summary_elem:
article['summary'] = summary_elem.get_text(strip=True)
# 提取日期
date_elem = element.select_one('.date, .time, time')
if date_elem:
if date_elem.has_attr('datetime'):
article['date'] = date_elem['datetime']
else:
article['date'] = date_elem.get_text(strip=True)
if article:
articles.append(article)
return articles
def close(self):
"""关闭浏览器"""
self.driver.quit()
# 使用示例
# scraper = DynamicNewsScraper()
# articles = scraper.scrape_dynamic_news('https://example-news.com', max_articles=10)
# scraper.close()
电商网站通常包含丰富的商品信息和用户评价,这个案例将演示如何抓取这类结构化数据。
import requests
from bs4 import BeautifulSoup
import re
import json
import csv
from urllib.parse import urljoin, urlparse
import time
class EcommerceScraper:
"""电商商品信息抓取器"""
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
self.products = []
# 设置请求头
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
})
def extract_price(self, price_text):
"""从文本中提取价格"""
if not price_text:
return 0.0
# 使用正则表达式提取数字
price_pattern = r'[\d,.]+'
matches = re.findall(price_pattern, price_text)
if matches:
# 取第一个匹配的数字
price_str = matches[0].replace(',', '')
try:
return float(price_str)
except:
return 0.0
return 0.0
def extract_rating(self, rating_text):
"""从文本中提取评分"""
if not rating_text:
return 0.0
# 提取数字
rating_pattern = r'[\d.]+'
matches = re.findall(rating_pattern, rating_text)
if matches:
try:
return float(matches[0])
except:
return 0.0
return 0.0
def scrape_search_results(self, search_query, max_pages=3):
"""抓取搜索结果"""
all_products = []
for page in range(1, max_pages + 1):
# 构建搜索URL(根据实际网站调整)
search_url = f"{self.base_url}/search?q={search_query}&page={page}"
print(f"正在抓取第 {page} 页搜索结果: {search_query}")
try:
response = self.session.get(search_url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'lxml')
# 提取商品列表(根据实际网站结构调整选择器)
product_items = soup.select('.product-item, .goods-item, .item')
for item in product_items:
product = self.parse_product_item(item)
if product:
product['search_query'] = search_query
product['result_page'] = page
all_products.append(product)
print(f"第 {page} 页完成,找到 {len(product_items)} 个商品")
# 礼貌性延迟
time.sleep(2)
except Exception as e:
print(f"抓取第 {page} 页失败: {e}")
continue
# 去重(基于商品ID或名称)
unique_products = []
seen_ids = set()
for product in all_products:
product_id = product.get('product_id') or product.get('title')
if product_id and product_id not in seen_ids:
seen_ids.add(product_id)
unique_products.append(product)
self.products.extend(unique_products)
print(f"搜索完成,找到 {len(unique_products)} 个唯一商品")
return unique_products
def parse_product_item(self, item):
"""解析单个商品项"""
product = {}
# 提取商品ID
product_id = item.get('data-product-id') or item.get('data-id')
if product_id:
product['product_id'] = product_id
# 提取商品名称
name_elem = item.select_one('.product-name, .title, .name')
if name_elem:
product['title'] = name_elem.get_text(strip=True)
# 提取价格
price_elem = item.select_one('.price, .current-price, .sale-price')
if price_elem:
price_text = price_elem.get_text(strip=True)
product['price'] = self.extract_price(price_text)
product['price_text'] = price_text
# 提取原价(如果有)
original_price_elem = item.select_one('.original-price, .market-price')
if original_price_elem:
original_price_text = original_price_elem.get_text(strip=True)
product['original_price'] = self.extract_price(original_price_text)
# 提取评分
rating_elem = item.select_one('.rating, .score, .star-rating')
if rating_elem:
rating_text = rating_elem.get_text(strip=True)
product['rating'] = self.extract_rating(rating_text)
# 提取评价数量
review_count_elem = item.select_one('.review-count, .comment-count')
if review_count_elem:
review_text = review_count_elem.get_text(strip=True)
numbers = re.findall(r'\d+', review_text)
if numbers:
product['review_count'] = int(numbers[0])
# 提取店铺名称
shop_elem = item.select_one('.shop-name, .store, .seller')
if shop_elem:
product['shop'] = shop_elem.get_text(strip=True)
# 提取商品图片
img_elem = item.select_one('img')
if img_elem and img_elem.has_attr('src'):
product['image_url'] = img_elem['src']
# 提取商品链接
link_elem = item.find('a')
if link_elem and link_elem.has_attr('href'):
product['url'] = urljoin(self.base_url, link_elem['href'])
return product if product else None
def scrape_product_detail(self, product_url):
"""抓取商品详情页"""
print(f"正在抓取商品详情: {product_url}")
try:
response = self.session.get(product_url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'lxml')
# 提取详细信息
detail = {}
# 提取商品描述
description_elem = soup.select_one('.product-description, .detail, .desc')
if description_elem:
detail['description'] = description_elem.get_text(strip=True)
# 提取规格参数
specs = {}
spec_elements = soup.select('.spec-item, .parameter-item')
for spec in spec_elements:
key_elem = spec.select_one('.key, .name')
value_elem = spec.select_one('.value, .val')
if key_elem and value_elem:
key = key_elem.get_text(strip=True)
value = value_elem.get_text(strip=True)
specs[key] = value
if specs:
detail['specifications'] = specs
# 提取库存信息
stock_elem = soup.select_one('.stock, .inventory')
if stock_elem:
stock_text = stock_elem.get_text(strip=True)
detail['stock'] = stock_text
# 判断是否有货
if '缺货' in stock_text or '无货' in stock_text or '售罄' in stock_text:
detail['in_stock'] = False
else:
detail['in_stock'] = True
# 提取月销量
sales_elem = soup.select_one('.sales, .monthly-sales')
if sales_elem:
sales_text = sales_elem.get_text(strip=True)
numbers = re.findall(r'\d+', sales_text)
if numbers:
detail['monthly_sales'] = int(numbers[0])
# 提取商品图片集
images = []
img_elements = soup.select('.product-image, .main-image img')
for img in img_elements:
if img.has_attr('src'):
src = img['src']
if src.startswith('http'):
images.append(src)
if images:
detail['images'] = images
time.sleep(1) # 礼貌性延迟
return detail
except Exception as e:
print(f"抓取商品详情失败 {product_url}: {e}")
return None
def enrich_products_with_details(self, max_products=10):
"""为商品列表补充详情信息"""
enriched_products = []
for i, product in enumerate(self.products[:max_products], 1):
if 'url' in product:
print(f"正在补充详情 ({i}/{min(len(self.products), max_products)}): {product.get('title', 'N/A')}")
details = self.scrape_product_detail(product['url'])
if details:
product.update(details)
enriched_products.append(product)
return enriched_products
def scrape_product_reviews(self, product_url, max_reviews=20):
"""抓取商品评价"""
reviews = []
# 构建评价页URL(根据实际网站调整)
# 假设评价页URL模式为:商品URL + /reviews?page=1
base_review_url = product_url.rstrip('/') + '/reviews'
for page in range(1, 3): # 抓取前2页评价
review_url = f"{base_review_url}?page={page}"
try:
response = self.session.get(review_url, timeout=10)
soup = BeautifulSoup(response.content, 'lxml')
# 提取评价项(根据实际网站结构调整)
review_items = soup.select('.review-item, .comment-item')
for item in review_items:
if len(reviews) >= max_reviews:
break
review = {}
# 提取用户名
user_elem = item.select_one('.user-name, .author')
if user_elem:
review['user'] = user_elem.get_text(strip=True)
# 提取评分
rating_elem = item.select_one('.review-rating, .star-rating')
if rating_elem:
rating_text = rating_elem.get_text(strip=True)
review['rating'] = self.extract_rating(rating_text)
# 提取评价内容
content_elem = item.select_one('.review-content, .comment-text')
if content_elem:
review['content'] = content_elem.get_text(strip=True)
# 提取评价时间
time_elem = item.select_one('.review-time, .comment-date')
if time_elem:
review['time'] = time_elem.get_text(strip=True)
# 提取有用数
helpful_elem = item.select_one('.helpful-count, .useful-count')
if helpful_elem:
helpful_text = helpful_elem.get_text(strip=True)
numbers = re.findall(r'\d+', helpful_text)
if numbers:
review['helpful_count'] = int(numbers[0])
if review:
reviews.append(review)
time.sleep(1)
except Exception as e:
print(f"抓取评价页 {page} 失败: {e}")
continue
return reviews
def analyze_products(self):
"""分析商品数据"""
if not self.products:
return {}
analysis = {
'total_products': len(self.products),
'price_range': {
'min': float('inf'),
'max': 0,
'avg': 0
},
'rating_stats': {
'avg': 0,
'distribution': {
'5星': 0,
'4星': 0,
'3星': 0,
'2星': 0,
'1星': 0
}
},
'shop_distribution': {}
}
total_price = 0
total_rating = 0
rating_count = 0
for product in self.products:
# 价格分析
price = product.get('price', 0)
if price > 0:
analysis['price_range']['min'] = min(analysis['price_range']['min'], price)
analysis['price_range']['max'] = max(analysis['price_range']['max'], price)
total_price += price
# 评分分析
rating = product.get('rating', 0)
if rating > 0:
total_rating += rating
rating_count += 1
# 评分分布
if rating >= 4.5:
analysis['rating_stats']['distribution']['5星'] += 1
elif rating >= 3.5:
analysis['rating_stats']['distribution']['4星'] += 1
elif rating >= 2.5:
analysis['rating_stats']['distribution']['3星'] += 1
elif rating >= 1.5:
analysis['rating_stats']['distribution']['2星'] += 1
else:
analysis['rating_stats']['distribution']['1星'] += 1
# 店铺分布
shop = product.get('shop')
if shop:
if shop in analysis['shop_distribution']:
analysis['shop_distribution'][shop] += 1
else:
analysis['shop_distribution'][shop] = 1
# 计算平均值
if len(self.products) > 0:
analysis['price_range']['avg'] = total_price / len(self.products)
if rating_count > 0:
analysis['rating_stats']['avg'] = total_rating / rating_count
return analysis
def save_to_csv(self, filename='products.csv'):
"""保存到CSV"""
if not self.products:
return False
fieldnames = [
'product_id', 'title', 'price', 'original_price', 'rating',
'review_count', 'shop', 'url', 'image_url'
]
try:
with open(filename, 'w', newline='', encoding='utf-8-sig') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for product in self.products:
# 只写入基本字段
row = {field: product.get(field, '') for field in fieldnames}
writer.writerow(row)
print(f"数据已保存到 {filename}")
return True
except Exception as e:
print(f"保存CSV失败: {e}")
return False
# 使用示例
def example_ecommerce_scraping():
"""电商抓取示例"""
# 创建抓取器(以示例网站为例)
scraper = EcommerceScraper(base_url='https://example-shop.com')
# 搜索商品
search_results = scraper.scrape_search_results(
search_query='笔记本电脑',
max_pages=2
)
if search_results:
# 补充详情信息
enriched_products = scraper.enrich_products_with_details(max_products=5)
# 分析数据
analysis = scraper.analyze_products()
print("\n=== 数据分析 ===")
print(f"总商品数: {analysis['total_products']}")
print(f"价格范围: ¥{analysis['price_range']['min']:.2f} - ¥{analysis['price_range']['max']:.2f}")
print(f"平均价格: ¥{analysis['price_range']['avg']:.2f}")
print(f"平均评分: {analysis['rating_stats']['avg']:.2f}")
# 显示前3个商品
print("\n=== 商品示例 ===")
for i, product in enumerate(search_results[:3], 1):
print(f"\n{i}. {product.get('title', 'N/A')}")
print(f" 价格: ¥{product.get('price', 0):.2f}")
print(f" 评分: {product.get('rating', 0):.1f} ({product.get('review_count', 0)}条评价)")
print(f" 店铺: {product.get('shop', '未知')}")
# 保存数据
scraper.save_to_csv('products_sample.csv')
return search_results
if __name__ == "__main__":
# 这是一个示例,实际使用时需要替换为真实的URL
print("这是一个电商抓取器的示例代码")
print("实际使用时需要根据目标网站的结构调整选择器")
# example_ecommerce_scraping()
| 项目 | 核心技术 | 难点与解决方案 | 可扩展方向 |
|---|---|---|---|
| 豆瓣电影Top250 | 分页处理、数据清洗、正则表达式 | 编码问题、反爬虫限制 | 添加图片下载、数据库存储、API开发 |
| 新闻网站抓取 | CSS选择器、URL处理、数据结构化 | 动态内容、网站结构多变 | 情感分析、自动摘要、热点发现 |
| 电商商品抓取 | 价格提取、评分分析、商品关联 | 反爬虫机制、数据量大 | 价格监控、竞品分析、推荐系统 |