BeautifulSoup实战:网页数据抓取案例

理论知识掌握后,让我们通过三个完整的实战案例来巩固BeautifulSoup的使用技巧。本章将实现豆瓣电影Top250抓取、新闻网站内容提取和电商商品信息抓取,涵盖常见的数据抓取场景。

重要提醒:实际抓取网站数据时,请务必遵守网站的robots.txt协议,尊重版权,合理控制请求频率,不要给目标网站服务器造成过大压力。

案例一:豆瓣电影Top250数据抓取

这是最经典的网页抓取案例之一,我们将完整抓取豆瓣电影Top250的所有信息。

1.1 分析页面结构

首先分析豆瓣电影Top250页面的HTML结构:

  • URL模式:https://movie.douban.com/top250?start=0(分页参数:start)
  • 每页25部电影,共10页
  • 每部电影的信息包含在<div class="item">
  • 需要提取的信息:排名、电影名称、评分、评价人数、导演、年份、地区、类型、简介

1.2 完整代码实现

import requests
from bs4 import BeautifulSoup
import time
import csv
import re

class DoubanMovieScraper:
    """豆瓣电影Top250抓取器"""

    def __init__(self):
        self.base_url = "https://movie.douban.com/top250"
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Connection': 'keep-alive',
        }
        self.movies = []

    def get_page(self, page_num):
        """获取指定页码的页面内容"""
        start = (page_num - 1) * 25
        url = f"{self.base_url}?start={start}"

        try:
            print(f"正在获取第 {page_num} 页...")
            response = requests.get(url, headers=self.headers, timeout=10)
            response.raise_for_status()

            # 设置编码
            response.encoding = 'utf-8'

            return response.text
        except Exception as e:
            print(f"获取第 {page_num} 页失败: {e}")
            return None

    def parse_movie_item(self, item):
        """解析单个电影项目"""
        movie = {}

        # 获取排名
        rank_elem = item.find('em')
        movie['rank'] = rank_elem.text if rank_elem else 'N/A'

        # 获取电影标题(中文和英文)
        title_elem = item.find('span', class_='title')
        if title_elem:
            movie['title'] = title_elem.text.strip()

        # 获取英文标题
        other_title_elem = item.find('span', class_='other')
        if other_title_elem:
            movie['other_title'] = other_title_elem.text.strip()

        # 获取评分
        rating_elem = item.find('span', class_='rating_num')
        movie['rating'] = rating_elem.text if rating_elem else 'N/A'

        # 获取评价人数
        rating_count_elem = item.find('div', class_='star').find_all('span')[-1]
        if rating_count_elem:
            rating_count_text = rating_count_elem.text
            # 提取数字
            numbers = re.findall(r'\d+', rating_count_text)
            movie['rating_count'] = numbers[0] if numbers else '0'

        # 获取导演、年份、地区、类型等信息
        info_elem = item.find('div', class_='bd').find('p')
        if info_elem:
            info_text = info_elem.text.strip()

            # 分割信息行
            lines = info_text.split('\n')
            if len(lines) >= 2:
                # 第一行:导演和主演
                director_line = lines[0].strip()
                movie['director'] = director_line.replace('导演:', '').strip()

                # 第二行:年份/地区/类型
                details_line = lines[1].strip()
                parts = details_line.split('/')
                if len(parts) >= 3:
                    movie['year'] = parts[0].strip()
                    movie['region'] = parts[1].strip()
                    movie['genre'] = parts[2].strip()

        # 获取简介
        quote_elem = item.find('span', class_='inq')
        movie['quote'] = quote_elem.text if quote_elem else 'N/A'

        # 获取电影链接
        link_elem = item.find('a')
        movie['link'] = link_elem['href'] if link_elem and link_elem.has_attr('href') else ''

        # 获取图片链接
        img_elem = item.find('img')
        movie['image_url'] = img_elem['src'] if img_elem and img_elem.has_attr('src') else ''

        return movie

    def parse_page(self, html_content):
        """解析页面内容,提取电影信息"""
        if not html_content:
            return []

        soup = BeautifulSoup(html_content, 'lxml')
        movie_items = soup.find_all('div', class_='item')

        movies_on_page = []
        for item in movie_items:
            movie = self.parse_movie_item(item)
            if movie:
                movies_on_page.append(movie)

        return movies_on_page

    def scrape_all_pages(self, max_pages=10):
        """抓取所有页面"""
        print("开始抓取豆瓣电影Top250...")

        for page_num in range(1, max_pages + 1):
            # 获取页面
            html = self.get_page(page_num)
            if not html:
                continue

            # 解析页面
            movies = self.parse_page(html)
            self.movies.extend(movies)

            print(f"第 {page_num} 页完成,找到 {len(movies)} 部电影")

            # 礼貌性延迟,避免请求过快
            time.sleep(2)

        print(f"\n抓取完成!总共找到 {len(self.movies)} 部电影")
        return self.movies

    def save_to_csv(self, filename='douban_top250.csv'):
        """保存数据到CSV文件"""
        if not self.movies:
            print("没有数据可保存")
            return

        # 定义CSV字段
        fieldnames = [
            'rank', 'title', 'other_title', 'rating', 'rating_count',
            'director', 'year', 'region', 'genre', 'quote', 'link', 'image_url'
        ]

        try:
            with open(filename, 'w', newline='', encoding='utf-8-sig') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()

                for movie in self.movies:
                    writer.writerow(movie)

            print(f"数据已保存到 {filename}")
            return True
        except Exception as e:
            print(f"保存CSV文件失败: {e}")
            return False

    def save_to_json(self, filename='douban_top250.json'):
        """保存数据到JSON文件"""
        import json

        if not self.movies:
            print("没有数据可保存")
            return

        try:
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(self.movies, f, ensure_ascii=False, indent=2)

            print(f"数据已保存到 {filename}")
            return True
        except Exception as e:
            print(f"保存JSON文件失败: {e}")
            return False

    def analyze_data(self):
        """数据分析示例"""
        if not self.movies:
            print("没有数据可分析")
            return

        print("\n=== 数据分析 ===")

        # 统计每年的电影数量
        year_counts = {}
        for movie in self.movies:
            year = movie.get('year', '未知')
            if year in year_counts:
                year_counts[year] += 1
            else:
                year_counts[year] = 1

        # 找出产量最高的年份
        top_years = sorted(year_counts.items(), key=lambda x: x[1], reverse=True)[:5]
        print("电影数量最多的年份(前5名):")
        for year, count in top_years:
            print(f"  {year}: {count} 部")

        # 统计导演作品数量
        director_counts = {}
        for movie in self.movies:
            director = movie.get('director', '').split(' ')[0]  # 只取第一位导演
            if director and director != 'N/A':
                if director in director_counts:
                    director_counts[director] += 1
                else:
                    director_counts[director] = 1

        # 找出作品最多的导演
        top_directors = sorted(director_counts.items(), key=lambda x: x[1], reverse=True)[:5]
        print("\n作品最多的导演(前5名):")
        for director, count in top_directors:
            print(f"  {director}: {count} 部")

        # 平均评分
        total_rating = 0
        count = 0
        for movie in self.movies:
            rating = movie.get('rating', '0')
            if rating != 'N/A':
                try:
                    total_rating += float(rating)
                    count += 1
                except:
                    pass

        if count > 0:
            avg_rating = total_rating / count
            print(f"\nTop250平均评分: {avg_rating:.2f}")

        # 评分分布
        rating_dist = {
            '9分以上': 0,
            '8-9分': 0,
            '7-8分': 0,
            '7分以下': 0
        }

        for movie in self.movies:
            rating = movie.get('rating', '0')
            if rating != 'N/A':
                try:
                    rating_float = float(rating)
                    if rating_float >= 9:
                        rating_dist['9分以上'] += 1
                    elif rating_float >= 8:
                        rating_dist['8-9分'] += 1
                    elif rating_float >= 7:
                        rating_dist['7-8分'] += 1
                    else:
                        rating_dist['7分以下'] += 1
                except:
                    pass

        print("\n评分分布:")
        for category, count in rating_dist.items():
            print(f"  {category}: {count} 部")

# 使用示例
if __name__ == "__main__":
    scraper = DoubanMovieScraper()

    # 抓取数据(这里只抓取前2页作为示例,完整抓取请设置为10页)
    movies = scraper.scrape_all_pages(max_pages=2)

    if movies:
        # 保存数据
        scraper.save_to_csv('douban_top250_sample.csv')
        scraper.save_to_json('douban_top250_sample.json')

        # 数据分析
        scraper.analyze_data()

        # 显示前3部电影信息
        print("\n=== 前3部电影信息 ===")
        for i, movie in enumerate(movies[:3], 1):
            print(f"\n{i}. {movie.get('title', 'N/A')}")
            print(f"   排名: {movie.get('rank', 'N/A')}")
            print(f"   评分: {movie.get('rating', 'N/A')} ({movie.get('rating_count', '0')}人评价)")
            print(f"   导演: {movie.get('director', 'N/A')}")
            print(f"   年份/地区/类型: {movie.get('year', 'N/A')}/{movie.get('region', 'N/A')}/{movie.get('genre', 'N/A')}")
            print(f"   简介: {movie.get('quote', 'N/A')}")

1.3 代码解析与优化建议

代码亮点解析
  1. 面向对象设计:使用类封装,代码结构清晰
  2. 异常处理:全面处理请求和解析过程中的异常
  3. 数据清洗:使用正则表达式提取评价人数中的数字
  4. 多种输出格式:支持CSV和JSON格式保存
  5. 数据分析功能:内置简单的统计分析
性能优化建议
# 1. 使用Session减少连接开销
session = requests.Session()
session.headers.update(headers)

# 2. 实现并发抓取(使用线程池)
from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_page_concurrently(page_nums, max_workers=3):
    """并发抓取多页"""
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交所有任务
        future_to_page = {
            executor.submit(scraper.get_page, page_num): page_num
            for page_num in page_nums
        }

        results = []
        for future in as_completed(future_to_page):
            page_num = future_to_page[future]
            try:
                html = future.result()
                results.append((page_num, html))
            except Exception as e:
                print(f"第 {page_num} 页抓取失败: {e}")

        return results

# 3. 实现断点续传
import json
import os

class ResumeScraper(DoubanMovieScraper):
    """支持断点续传的爬虫"""

    def __init__(self, state_file='scraper_state.json'):
        super().__init__()
        self.state_file = state_file
        self.load_state()

    def load_state(self):
        """加载爬取状态"""
        if os.path.exists(self.state_file):
            try:
                with open(self.state_file, 'r', encoding='utf-8') as f:
                    state = json.load(f)
                    self.movies = state.get('movies', [])
                    self.last_page = state.get('last_page', 0)
                    print(f"加载状态:已抓取 {len(self.movies)} 部电影,最后抓取页面:{self.last_page}")
            except Exception as e:
                print(f"加载状态失败: {e}")
                self.movies = []
                self.last_page = 0
        else:
            self.movies = []
            self.last_page = 0

    def save_state(self):
        """保存爬取状态"""
        state = {
            'movies': self.movies,
            'last_page': self.last_page,
            'timestamp': time.time()
        }
        try:
            with open(self.state_file, 'w', encoding='utf-8') as f:
                json.dump(state, f, ensure_ascii=False, indent=2)
        except Exception as e:
            print(f"保存状态失败: {e}")

    def scrape_with_resume(self, max_pages=10):
        """支持断点续传的抓取"""
        start_page = self.last_page + 1

        for page_num in range(start_page, max_pages + 1):
            print(f"正在抓取第 {page_num} 页...")

            html = self.get_page(page_num)
            if not html:
                continue

            movies_on_page = self.parse_page(html)
            self.movies.extend(movies_on_page)
            self.last_page = page_num

            # 每抓取一页保存一次状态
            self.save_state()

            print(f"第 {page_num} 页完成,找到 {len(movies_on_page)} 部电影")
            time.sleep(2)

        print(f"\n抓取完成!总共找到 {len(self.movies)} 部电影")
        return self.movies

案例二:新闻网站内容抓取

新闻网站通常有更复杂的结构和更多的动态内容,这个案例将演示如何处理这类网站。

2.1 新闻网站抓取器实现

import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import time
from datetime import datetime
import json

class NewsScraper:
    """通用新闻网站抓取器"""

    def __init__(self, base_url, config=None):
        self.base_url = base_url
        self.session = requests.Session()
        self.articles = []

        # 默认配置
        self.config = {
            'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'timeout': 10,
            'delay': 1,
            'max_articles': 50,
            'selectors': {
                'article': 'article, .news-item, .post, .story',
                'title': 'h1, h2, .title, .headline',
                'content': '.content, .article-body, .post-content, .story-content',
                'author': '.author, .byline, .writer',
                'date': '.date, .time, .published, time',
                'category': '.category, .section, .tag',
                'summary': '.summary, .excerpt, .description'
            }
        }

        # 合并自定义配置
        if config:
            self.config.update(config)

        # 设置请求头
        self.session.headers.update({
            'User-Agent': self.config['user_agent'],
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        })

    def normalize_url(self, url):
        """规范化URL"""
        if not urlparse(url).scheme:
            url = urljoin(self.base_url, url)
        return url

    def fetch_page(self, url):
        """获取页面内容"""
        try:
            response = self.session.get(url, timeout=self.config['timeout'])
            response.raise_for_status()

            # 自动检测编码
            response.encoding = response.apparent_encoding

            return response.text
        except Exception as e:
            print(f"获取页面失败 {url}: {e}")
            return None

    def extract_with_selectors(self, soup, selector_key, default=''):
        """使用多种选择器提取内容"""
        selectors = self.config['selectors'].get(selector_key, '')

        if isinstance(selectors, str):
            selectors = [s.strip() for s in selectors.split(',')]

        for selector in selectors:
            if selector:
                element = soup.select_one(selector)
                if element:
                    # 处理特殊元素
                    if selector == 'time' and element.has_attr('datetime'):
                        return element['datetime']
                    return element.get_text(strip=True)

        return default

    def parse_article_page(self, url):
        """解析文章详情页"""
        print(f"正在解析文章: {url}")

        html = self.fetch_page(url)
        if not html:
            return None

        soup = BeautifulSoup(html, 'lxml')

        article = {
            'url': url,
            'title': self.extract_with_selectors(soup, 'title'),
            'content': self.extract_with_selectors(soup, 'content'),
            'author': self.extract_with_selectors(soup, 'author'),
            'date': self.extract_with_selectors(soup, 'date'),
            'category': self.extract_with_selectors(soup, 'category'),
            'summary': self.extract_with_selectors(soup, 'summary'),
            'images': [],
            'links': [],
            'scraped_at': datetime.now().isoformat()
        }

        # 提取图片
        for img in soup.select('img'):
            src = img.get('src', '')
            if src:
                src = self.normalize_url(src)
                article['images'].append({
                    'src': src,
                    'alt': img.get('alt', ''),
                    'title': img.get('title', '')
                })

        # 提取内部链接
        for a in soup.select('a[href]'):
            href = a.get('href', '')
            if href and urlparse(href).netloc == urlparse(self.base_url).netloc:
                article['links'].append({
                    'url': self.normalize_url(href),
                    'text': a.get_text(strip=True)[:100]
                })

        # 清理内容
        if article['content']:
            # 移除多余空白
            article['content'] = ' '.join(article['content'].split())
            # 限制长度
            if len(article['content']) > 5000:
                article['content'] = article['content'][:5000] + '...'

        return article

    def extract_article_links(self, soup):
        """从列表页提取文章链接"""
        article_selectors = self.config['selectors']['article']

        if isinstance(article_selectors, str):
            article_selectors = [s.strip() for s in article_selectors.split(',')]

        links = set()

        for selector in article_selectors:
            if selector:
                for element in soup.select(selector):
                    # 查找链接
                    link_elem = element.find('a')
                    if link_elem and link_elem.has_attr('href'):
                        href = link_elem['href']
                        if href and not href.startswith('javascript:'):
                            full_url = self.normalize_url(href)
                            links.add(full_url)

        return list(links)

    def scrape_category(self, category_url, max_articles=None):
        """抓取分类页面下的所有文章"""
        if max_articles is None:
            max_articles = self.config['max_articles']

        print(f"开始抓取分类: {category_url}")

        # 获取分类页面
        html = self.fetch_page(category_url)
        if not html:
            return []

        soup = BeautifulSoup(html, 'lxml')

        # 提取文章链接
        article_urls = self.extract_article_links(soup)
        print(f"找到 {len(article_urls)} 个文章链接")

        # 限制抓取数量
        article_urls = article_urls[:max_articles]

        # 逐个抓取文章
        articles = []
        for i, url in enumerate(article_urls, 1):
            print(f"正在处理第 {i}/{len(article_urls)} 篇文章")

            article = self.parse_article_page(url)
            if article:
                articles.append(article)

            # 礼貌性延迟
            time.sleep(self.config['delay'])

        print(f"分类抓取完成,成功获取 {len(articles)} 篇文章")
        return articles

    def scrape_multiple_categories(self, category_urls):
        """抓取多个分类"""
        all_articles = []

        for category_url in category_urls:
            articles = self.scrape_category(category_url)
            all_articles.extend(articles)

            # 避免请求过快
            time.sleep(self.config['delay'] * 2)

        # 去重(基于URL)
        unique_articles = []
        seen_urls = set()

        for article in all_articles:
            if article['url'] not in seen_urls:
                seen_urls.add(article['url'])
                unique_articles.append(article)

        print(f"总共抓取 {len(unique_articles)} 篇唯一文章")
        self.articles = unique_articles
        return unique_articles

    def save_articles(self, filename='news_articles.json'):
        """保存文章数据"""
        if not self.articles:
            print("没有文章数据可保存")
            return False

        try:
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(self.articles, f, ensure_ascii=False, indent=2)

            print(f"文章数据已保存到 {filename}")
            return True
        except Exception as e:
            print(f"保存文章数据失败: {e}")
            return False

    def generate_summary(self):
        """生成抓取摘要"""
        if not self.articles:
            return "没有数据"

        summary = {
            'total_articles': len(self.articles),
            'categories': set(),
            'authors': set(),
            'date_range': {
                'earliest': None,
                'latest': None
            }
        }

        for article in self.articles:
            # 收集分类
            if article['category']:
                summary['categories'].add(article['category'])

            # 收集作者
            if article['author']:
                summary['authors'].add(article['author'])

            # 更新日期范围
            if article['date']:
                try:
                    date_obj = datetime.fromisoformat(article['date'].replace('Z', '+00:00'))
                    if not summary['date_range']['earliest'] or date_obj < summary['date_range']['earliest']:
                        summary['date_range']['earliest'] = date_obj
                    if not summary['date_range']['latest'] or date_obj > summary['date_range']['latest']:
                        summary['date_range']['latest'] = date_obj
                except:
                    pass

        summary['categories'] = list(summary['categories'])
        summary['authors'] = list(summary['authors'])

        return summary

# 示例配置和用法
def example_news_scraping():
    """新闻网站抓取示例"""

    # 配置抓取器(以示例网站为例)
    config = {
        'max_articles': 10,  # 每个分类最多抓取10篇文章
        'delay': 2,  # 请求延迟
        'selectors': {
            'article': '.post, .article-item, .news-item',
            'title': 'h1.entry-title, h1.article-title, .title',
            'content': '.entry-content, .article-content, .content',
            'author': '.author, .byline, .post-author',
            'date': '.post-date, .article-date, time',
            'category': '.category, .post-category, .section',
            'summary': '.entry-summary, .excerpt, .description'
        }
    }

    # 创建抓取器
    scraper = NewsScraper(
        base_url='https://example-news.com',
        config=config
    )

    # 定义要抓取的分类
    categories = [
        'https://example-news.com/category/technology',
        'https://example-news.com/category/business',
        'https://example-news.com/category/science'
    ]

    # 开始抓取
    articles = scraper.scrape_multiple_categories(categories)

    if articles:
        # 保存数据
        scraper.save_articles('news_data.json')

        # 生成摘要
        summary = scraper.generate_summary()
        print("\n=== 抓取摘要 ===")
        print(f"总文章数: {summary['total_articles']}")
        print(f"分类数: {len(summary['categories'])}")
        print(f"作者数: {len(summary['authors'])}")

        # 显示前3篇文章
        print("\n=== 示例文章 ===")
        for i, article in enumerate(articles[:3], 1):
            print(f"\n{i}. {article['title']}")
            print(f"   作者: {article['author'] or '未知'}")
            print(f"   日期: {article['date'] or '未知'}")
            print(f"   分类: {article['category'] or '未知'}")
            print(f"   摘要: {article['summary'][:100] if article['summary'] else '无'}...")

    return articles

# 注意:实际使用时,需要根据目标网站的结构调整选择器配置
if __name__ == "__main__":
    # 这是一个示例,实际使用时需要替换为真实的URL和选择器
    print("这是一个新闻抓取器的示例代码")
    print("实际使用时需要根据目标网站的结构调整选择器配置")
    # example_news_scraping()

2.2 处理动态内容

# 对于使用JavaScript动态加载内容的网站,可能需要使用Selenium
# 以下是使用Selenium配合BeautifulSoup的示例

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
import time

class DynamicNewsScraper:
    """处理动态加载内容的新闻抓取器"""

    def __init__(self):
        # 配置Selenium
        options = webdriver.ChromeOptions()
        options.add_argument('--headless')  # 无头模式
        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')

        self.driver = webdriver.Chrome(options=options)
        self.wait = WebDriverWait(self.driver, 10)

    def scroll_to_load_content(self, scroll_pause_time=2, max_scrolls=10):
        """滚动页面加载更多内容"""
        last_height = self.driver.execute_script("return document.body.scrollHeight")

        for _ in range(max_scrolls):
            # 滚动到底部
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

            # 等待加载
            time.sleep(scroll_pause_time)

            # 计算新的滚动高度
            new_height = self.driver.execute_script("return document.body.scrollHeight")

            if new_height == last_height:
                break

            last_height = new_height

    def click_load_more(self, button_selector):
        """点击"加载更多"按钮"""
        try:
            load_more_button = self.wait.until(
                EC.element_to_be_clickable((By.CSS_SELECTOR, button_selector))
            )
            load_more_button.click()
            time.sleep(2)  # 等待内容加载
            return True
        except:
            return False

    def scrape_dynamic_news(self, url, max_articles=20):
        """抓取动态加载的新闻"""
        print(f"访问动态页面: {url}")
        self.driver.get(url)

        # 等待页面初始加载
        time.sleep(3)

        # 滚动加载更多内容
        self.scroll_to_load_content()

        # 尝试点击"加载更多"按钮(如果有)
        load_more_selectors = [
            '.load-more', '.more-articles', '.see-more',
            'button:contains("更多")', 'a:contains("更多")'
        ]

        for selector in load_more_selectors:
            if self.click_load_more(selector):
                # 再次滚动
                self.scroll_to_load_content()

        # 获取页面源码
        page_source = self.driver.page_source

        # 使用BeautifulSoup解析
        soup = BeautifulSoup(page_source, 'lxml')

        # 提取文章信息(根据实际网站结构调整)
        articles = []
        article_elements = soup.select('article, .news-item, .post')

        for element in article_elements[:max_articles]:
            article = {}

            # 提取标题
            title_elem = element.select_one('h2, h3, .title, .headline')
            if title_elem:
                article['title'] = title_elem.get_text(strip=True)

            # 提取链接
            link_elem = element.find('a')
            if link_elem and link_elem.has_attr('href'):
                article['url'] = link_elem['href']

            # 提取摘要
            summary_elem = element.select_one('.summary, .excerpt, .description')
            if summary_elem:
                article['summary'] = summary_elem.get_text(strip=True)

            # 提取日期
            date_elem = element.select_one('.date, .time, time')
            if date_elem:
                if date_elem.has_attr('datetime'):
                    article['date'] = date_elem['datetime']
                else:
                    article['date'] = date_elem.get_text(strip=True)

            if article:
                articles.append(article)

        return articles

    def close(self):
        """关闭浏览器"""
        self.driver.quit()

# 使用示例
# scraper = DynamicNewsScraper()
# articles = scraper.scrape_dynamic_news('https://example-news.com', max_articles=10)
# scraper.close()

案例三:电商商品信息抓取

电商网站通常包含丰富的商品信息和用户评价,这个案例将演示如何抓取这类结构化数据。

3.1 电商商品抓取器实现

import requests
from bs4 import BeautifulSoup
import re
import json
import csv
from urllib.parse import urljoin, urlparse
import time

class EcommerceScraper:
    """电商商品信息抓取器"""

    def __init__(self, base_url):
        self.base_url = base_url
        self.session = requests.Session()
        self.products = []

        # 设置请求头
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        })

    def extract_price(self, price_text):
        """从文本中提取价格"""
        if not price_text:
            return 0.0

        # 使用正则表达式提取数字
        price_pattern = r'[\d,.]+'
        matches = re.findall(price_pattern, price_text)

        if matches:
            # 取第一个匹配的数字
            price_str = matches[0].replace(',', '')
            try:
                return float(price_str)
            except:
                return 0.0

        return 0.0

    def extract_rating(self, rating_text):
        """从文本中提取评分"""
        if not rating_text:
            return 0.0

        # 提取数字
        rating_pattern = r'[\d.]+'
        matches = re.findall(rating_pattern, rating_text)

        if matches:
            try:
                return float(matches[0])
            except:
                return 0.0

        return 0.0

    def scrape_search_results(self, search_query, max_pages=3):
        """抓取搜索结果"""
        all_products = []

        for page in range(1, max_pages + 1):
            # 构建搜索URL(根据实际网站调整)
            search_url = f"{self.base_url}/search?q={search_query}&page={page}"

            print(f"正在抓取第 {page} 页搜索结果: {search_query}")

            try:
                response = self.session.get(search_url, timeout=10)
                response.raise_for_status()

                soup = BeautifulSoup(response.content, 'lxml')

                # 提取商品列表(根据实际网站结构调整选择器)
                product_items = soup.select('.product-item, .goods-item, .item')

                for item in product_items:
                    product = self.parse_product_item(item)
                    if product:
                        product['search_query'] = search_query
                        product['result_page'] = page
                        all_products.append(product)

                print(f"第 {page} 页完成,找到 {len(product_items)} 个商品")

                # 礼貌性延迟
                time.sleep(2)

            except Exception as e:
                print(f"抓取第 {page} 页失败: {e}")
                continue

        # 去重(基于商品ID或名称)
        unique_products = []
        seen_ids = set()

        for product in all_products:
            product_id = product.get('product_id') or product.get('title')
            if product_id and product_id not in seen_ids:
                seen_ids.add(product_id)
                unique_products.append(product)

        self.products.extend(unique_products)
        print(f"搜索完成,找到 {len(unique_products)} 个唯一商品")
        return unique_products

    def parse_product_item(self, item):
        """解析单个商品项"""
        product = {}

        # 提取商品ID
        product_id = item.get('data-product-id') or item.get('data-id')
        if product_id:
            product['product_id'] = product_id

        # 提取商品名称
        name_elem = item.select_one('.product-name, .title, .name')
        if name_elem:
            product['title'] = name_elem.get_text(strip=True)

        # 提取价格
        price_elem = item.select_one('.price, .current-price, .sale-price')
        if price_elem:
            price_text = price_elem.get_text(strip=True)
            product['price'] = self.extract_price(price_text)
            product['price_text'] = price_text

        # 提取原价(如果有)
        original_price_elem = item.select_one('.original-price, .market-price')
        if original_price_elem:
            original_price_text = original_price_elem.get_text(strip=True)
            product['original_price'] = self.extract_price(original_price_text)

        # 提取评分
        rating_elem = item.select_one('.rating, .score, .star-rating')
        if rating_elem:
            rating_text = rating_elem.get_text(strip=True)
            product['rating'] = self.extract_rating(rating_text)

        # 提取评价数量
        review_count_elem = item.select_one('.review-count, .comment-count')
        if review_count_elem:
            review_text = review_count_elem.get_text(strip=True)
            numbers = re.findall(r'\d+', review_text)
            if numbers:
                product['review_count'] = int(numbers[0])

        # 提取店铺名称
        shop_elem = item.select_one('.shop-name, .store, .seller')
        if shop_elem:
            product['shop'] = shop_elem.get_text(strip=True)

        # 提取商品图片
        img_elem = item.select_one('img')
        if img_elem and img_elem.has_attr('src'):
            product['image_url'] = img_elem['src']

        # 提取商品链接
        link_elem = item.find('a')
        if link_elem and link_elem.has_attr('href'):
            product['url'] = urljoin(self.base_url, link_elem['href'])

        return product if product else None

    def scrape_product_detail(self, product_url):
        """抓取商品详情页"""
        print(f"正在抓取商品详情: {product_url}")

        try:
            response = self.session.get(product_url, timeout=10)
            response.raise_for_status()

            soup = BeautifulSoup(response.content, 'lxml')

            # 提取详细信息
            detail = {}

            # 提取商品描述
            description_elem = soup.select_one('.product-description, .detail, .desc')
            if description_elem:
                detail['description'] = description_elem.get_text(strip=True)

            # 提取规格参数
            specs = {}
            spec_elements = soup.select('.spec-item, .parameter-item')
            for spec in spec_elements:
                key_elem = spec.select_one('.key, .name')
                value_elem = spec.select_one('.value, .val')

                if key_elem and value_elem:
                    key = key_elem.get_text(strip=True)
                    value = value_elem.get_text(strip=True)
                    specs[key] = value

            if specs:
                detail['specifications'] = specs

            # 提取库存信息
            stock_elem = soup.select_one('.stock, .inventory')
            if stock_elem:
                stock_text = stock_elem.get_text(strip=True)
                detail['stock'] = stock_text

                # 判断是否有货
                if '缺货' in stock_text or '无货' in stock_text or '售罄' in stock_text:
                    detail['in_stock'] = False
                else:
                    detail['in_stock'] = True

            # 提取月销量
            sales_elem = soup.select_one('.sales, .monthly-sales')
            if sales_elem:
                sales_text = sales_elem.get_text(strip=True)
                numbers = re.findall(r'\d+', sales_text)
                if numbers:
                    detail['monthly_sales'] = int(numbers[0])

            # 提取商品图片集
            images = []
            img_elements = soup.select('.product-image, .main-image img')
            for img in img_elements:
                if img.has_attr('src'):
                    src = img['src']
                    if src.startswith('http'):
                        images.append(src)

            if images:
                detail['images'] = images

            time.sleep(1)  # 礼貌性延迟
            return detail

        except Exception as e:
            print(f"抓取商品详情失败 {product_url}: {e}")
            return None

    def enrich_products_with_details(self, max_products=10):
        """为商品列表补充详情信息"""
        enriched_products = []

        for i, product in enumerate(self.products[:max_products], 1):
            if 'url' in product:
                print(f"正在补充详情 ({i}/{min(len(self.products), max_products)}): {product.get('title', 'N/A')}")

                details = self.scrape_product_detail(product['url'])
                if details:
                    product.update(details)

                enriched_products.append(product)

        return enriched_products

    def scrape_product_reviews(self, product_url, max_reviews=20):
        """抓取商品评价"""
        reviews = []

        # 构建评价页URL(根据实际网站调整)
        # 假设评价页URL模式为:商品URL + /reviews?page=1
        base_review_url = product_url.rstrip('/') + '/reviews'

        for page in range(1, 3):  # 抓取前2页评价
            review_url = f"{base_review_url}?page={page}"

            try:
                response = self.session.get(review_url, timeout=10)
                soup = BeautifulSoup(response.content, 'lxml')

                # 提取评价项(根据实际网站结构调整)
                review_items = soup.select('.review-item, .comment-item')

                for item in review_items:
                    if len(reviews) >= max_reviews:
                        break

                    review = {}

                    # 提取用户名
                    user_elem = item.select_one('.user-name, .author')
                    if user_elem:
                        review['user'] = user_elem.get_text(strip=True)

                    # 提取评分
                    rating_elem = item.select_one('.review-rating, .star-rating')
                    if rating_elem:
                        rating_text = rating_elem.get_text(strip=True)
                        review['rating'] = self.extract_rating(rating_text)

                    # 提取评价内容
                    content_elem = item.select_one('.review-content, .comment-text')
                    if content_elem:
                        review['content'] = content_elem.get_text(strip=True)

                    # 提取评价时间
                    time_elem = item.select_one('.review-time, .comment-date')
                    if time_elem:
                        review['time'] = time_elem.get_text(strip=True)

                    # 提取有用数
                    helpful_elem = item.select_one('.helpful-count, .useful-count')
                    if helpful_elem:
                        helpful_text = helpful_elem.get_text(strip=True)
                        numbers = re.findall(r'\d+', helpful_text)
                        if numbers:
                            review['helpful_count'] = int(numbers[0])

                    if review:
                        reviews.append(review)

                time.sleep(1)

            except Exception as e:
                print(f"抓取评价页 {page} 失败: {e}")
                continue

        return reviews

    def analyze_products(self):
        """分析商品数据"""
        if not self.products:
            return {}

        analysis = {
            'total_products': len(self.products),
            'price_range': {
                'min': float('inf'),
                'max': 0,
                'avg': 0
            },
            'rating_stats': {
                'avg': 0,
                'distribution': {
                    '5星': 0,
                    '4星': 0,
                    '3星': 0,
                    '2星': 0,
                    '1星': 0
                }
            },
            'shop_distribution': {}
        }

        total_price = 0
        total_rating = 0
        rating_count = 0

        for product in self.products:
            # 价格分析
            price = product.get('price', 0)
            if price > 0:
                analysis['price_range']['min'] = min(analysis['price_range']['min'], price)
                analysis['price_range']['max'] = max(analysis['price_range']['max'], price)
                total_price += price

            # 评分分析
            rating = product.get('rating', 0)
            if rating > 0:
                total_rating += rating
                rating_count += 1

                # 评分分布
                if rating >= 4.5:
                    analysis['rating_stats']['distribution']['5星'] += 1
                elif rating >= 3.5:
                    analysis['rating_stats']['distribution']['4星'] += 1
                elif rating >= 2.5:
                    analysis['rating_stats']['distribution']['3星'] += 1
                elif rating >= 1.5:
                    analysis['rating_stats']['distribution']['2星'] += 1
                else:
                    analysis['rating_stats']['distribution']['1星'] += 1

            # 店铺分布
            shop = product.get('shop')
            if shop:
                if shop in analysis['shop_distribution']:
                    analysis['shop_distribution'][shop] += 1
                else:
                    analysis['shop_distribution'][shop] = 1

        # 计算平均值
        if len(self.products) > 0:
            analysis['price_range']['avg'] = total_price / len(self.products)

        if rating_count > 0:
            analysis['rating_stats']['avg'] = total_rating / rating_count

        return analysis

    def save_to_csv(self, filename='products.csv'):
        """保存到CSV"""
        if not self.products:
            return False

        fieldnames = [
            'product_id', 'title', 'price', 'original_price', 'rating',
            'review_count', 'shop', 'url', 'image_url'
        ]

        try:
            with open(filename, 'w', newline='', encoding='utf-8-sig') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()

                for product in self.products:
                    # 只写入基本字段
                    row = {field: product.get(field, '') for field in fieldnames}
                    writer.writerow(row)

            print(f"数据已保存到 {filename}")
            return True
        except Exception as e:
            print(f"保存CSV失败: {e}")
            return False

# 使用示例
def example_ecommerce_scraping():
    """电商抓取示例"""

    # 创建抓取器(以示例网站为例)
    scraper = EcommerceScraper(base_url='https://example-shop.com')

    # 搜索商品
    search_results = scraper.scrape_search_results(
        search_query='笔记本电脑',
        max_pages=2
    )

    if search_results:
        # 补充详情信息
        enriched_products = scraper.enrich_products_with_details(max_products=5)

        # 分析数据
        analysis = scraper.analyze_products()

        print("\n=== 数据分析 ===")
        print(f"总商品数: {analysis['total_products']}")
        print(f"价格范围: ¥{analysis['price_range']['min']:.2f} - ¥{analysis['price_range']['max']:.2f}")
        print(f"平均价格: ¥{analysis['price_range']['avg']:.2f}")
        print(f"平均评分: {analysis['rating_stats']['avg']:.2f}")

        # 显示前3个商品
        print("\n=== 商品示例 ===")
        for i, product in enumerate(search_results[:3], 1):
            print(f"\n{i}. {product.get('title', 'N/A')}")
            print(f"   价格: ¥{product.get('price', 0):.2f}")
            print(f"   评分: {product.get('rating', 0):.1f} ({product.get('review_count', 0)}条评价)")
            print(f"   店铺: {product.get('shop', '未知')}")

        # 保存数据
        scraper.save_to_csv('products_sample.csv')

    return search_results

if __name__ == "__main__":
    # 这是一个示例,实际使用时需要替换为真实的URL
    print("这是一个电商抓取器的示例代码")
    print("实际使用时需要根据目标网站的结构调整选择器")
    # example_ecommerce_scraping()

项目总结与扩展

项目经验总结

项目 核心技术 难点与解决方案 可扩展方向
豆瓣电影Top250 分页处理、数据清洗、正则表达式 编码问题、反爬虫限制 添加图片下载、数据库存储、API开发
新闻网站抓取 CSS选择器、URL处理、数据结构化 动态内容、网站结构多变 情感分析、自动摘要、热点发现
电商商品抓取 价格提取、评分分析、商品关联 反爬虫机制、数据量大 价格监控、竞品分析、推荐系统

常见问题与解决方案

问题1:网站有反爬虫机制怎么办?
  • 使用代理IP:轮换IP地址避免被封
  • 设置随机延迟:模拟人类浏览行为
  • 使用Session:保持Cookies和会话状态
  • 伪装请求头:随机切换User-Agent
  • 遵守robots.txt:尊重网站爬虫政策
问题2:如何处理动态加载的内容?
  • 分析API接口:直接调用网站的数据接口
  • 使用Selenium:模拟浏览器行为
  • 分析JavaScript:找到数据加载的逻辑
  • 等待策略:设置合理的等待时间

下一步学习建议

  1. 学习Scrapy框架:对于大型爬虫项目,Scrapy更高效
  2. 数据库存储:学习使用MySQL、MongoDB等数据库存储数据
  3. 数据清洗:掌握Pandas进行数据清洗和分析
  4. API开发:将爬虫数据通过API提供服务
  5. 可视化展示:使用ECharts、Matplotlib等展示数据
本章总结:通过三个完整的实战案例,我们演示了如何使用BeautifulSoup进行实际的网页数据抓取。从简单的静态页面到复杂的动态网站,从数据抓取到数据分析和存储,这些案例涵盖了爬虫开发的核心技术。记住,编写爬虫程序不仅需要技术能力,还需要遵守法律法规和网站规则,尊重数据所有权。希望这些案例能帮助你更好地应用BeautifulSoup解决实际问题。