Requests 结合 BeautifulSoup 解析 HTML

黄金组合 - Requests 用于获取网页内容,BeautifulSoup 用于解析 HTML,这是 Python 网络爬虫和数据抓取的标准工具组合。

为什么需要结合使用 Requests 和 BeautifulSoup?

Requests
获取网页内容
BeautifulSoup
解析HTML结构
数据提取
提取所需信息
Requests 的作用
  • 发送HTTP请求: GET, POST, PUT, DELETE 等
  • 处理响应: 状态码、响应头、Cookie 等
  • 会话管理: 保持登录状态
  • 错误处理: 网络异常、超时等
  • 代理支持: 通过代理服务器请求
BeautifulSoup 的作用
  • 解析HTML/XML: 将文档转换为树状结构
  • 导航文档: 父节点、子节点、兄弟节点
  • 搜索元素: 标签名、类名、ID、属性等
  • 提取数据: 文本、属性、链接等
  • 修改文档: 添加、删除、修改元素
解析器比较 选择适合的HTML解析器
内置解析器
  • html.parser: Python标准库,速度慢但无需安装
  • lxml: 速度快,功能强大,需要安装C扩展
  • html5lib: 解析最准确,速度最慢,遵循HTML5规范
推荐选择
  • 开发/测试: html.parser (无需额外依赖)
  • 生产环境: lxml (性能最好)
  • 复杂页面: html5lib (容错性最好)

安装与配置

安装所需库
# 安装 Requests 和 BeautifulSoup4
pip install requests beautifulsoup4

# 安装可选的解析器(推荐)
pip install lxml                # 高性能解析器
pip install html5lib            # HTML5解析器

# 验证安装
python -c "import requests; import bs4; print('安装成功')"

# 或者使用 requirements.txt
# requests==2.28.1
# beautifulsoup4==4.11.1
# lxml==4.9.1

基础导入与配置

基础导入与配置
import requests
from bs4 import BeautifulSoup
import time
import re
import json
import csv
import os

# 配置请求头,模拟浏览器
HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
    'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
    'Accept-Encoding': 'gzip, deflate',
    'Connection': 'keep-alive',
}

# 配置BeautifulSoup解析器
PARSER = 'lxml'  # 也可以使用 'html.parser' 或 'html5lib'

# 创建会话
session = requests.Session()
session.headers.update(HEADERS)

基本用法

使用 Requests 获取网页,然后使用 BeautifulSoup 解析的基本流程。

<html>
  <head>
    <title>示例网页</title>
  </head>
  <body>
    <h1 id="main-title">欢迎光临</h1>
    <div class="content">
      <p>这是一个示例段落。</p>
    </div>
  </body>
</html>
基本工作流程示例
import requests
from bs4 import BeautifulSoup

def fetch_and_parse(url):
    """获取网页并解析的基本函数"""

    try:
        # 1. 使用Requests获取网页
        response = requests.get(url, timeout=10)
        response.raise_for_status()  # 检查HTTP错误

        # 2. 设置正确的编码(如果需要)
        response.encoding = response.apparent_encoding

        # 3. 使用BeautifulSoup解析HTML
        soup = BeautifulSoup(response.text, 'lxml')

        # 4. 提取数据
        # 获取页面标题
        page_title = soup.title.string if soup.title else "无标题"
        print(f"页面标题: {page_title}")

        # 获取所有链接
        links = soup.find_all('a')
        print(f"找到 {len(links)} 个链接")

        # 获取特定元素
        main_content = soup.find('div', class_='content')
        if main_content:
            print(f"主要内容: {main_content.get_text(strip=True)[:100]}...")

        return soup

    except requests.exceptions.RequestException as e:
        print(f"请求错误: {e}")
        return None
    except Exception as e:
        print(f"解析错误: {e}")
        return None

# 使用示例
if __name__ == "__main__":
    # 测试URL
    test_url = "https://httpbin.org/html"
    soup = fetch_and_parse(test_url)

    if soup:
        print("解析成功!")
        # 可以继续处理soup对象...
解析过程演示
输入: HTML文档
<div class="product">
  <h3>Python编程</h3>
  <p class="price">¥59.99</p>
  <a href="/buy/python">购买</a>
</div>
输出: 提取的数据
{
  "title": "Python编程",
  "price": "¥59.99",
  "link": "/buy/python"
}
BeautifulSoup代码
product = soup.find('div', class_='product')
title = product.h3.text
price = product.find('p', class_='price').text
link = product.a['href']
工作原理
  • find() 查找第一个匹配元素
  • .text 获取元素文本内容
  • ['href'] 获取元素属性
  • 通过点号访问子元素

BeautifulSoup 选择器详解

标签选择器

通过标签名选择元素

# 查找所有div标签
soup.find_all('div')

# 查找第一个p标签
soup.find('p')
类选择器

通过CSS类选择元素

# 查找class为content的元素
soup.find_all(class_='content')

# 查找class包含active的元素
soup.find_all(class_=re.compile('active'))
ID选择器

通过ID选择元素

# 查找id为main的元素
soup.find(id='main')

# 通过属性查找
soup.find(attrs={'id': 'main'})
属性选择器

通过属性选择元素

# 查找有href属性的a标签
soup.find_all('a', href=True)

# 查找href以http开头的a标签
soup.find_all('a', href=re.compile('^http'))
CSS选择器

使用CSS选择器语法

# 选择所有class为item的div
soup.select('div.item')

# 选择id为content下的所有p标签
soup.select('#content p')

# 选择第一个匹配的元素
soup.select_one('.item')
层级选择器

通过层级关系选择元素

# 选择所有div下的直接子p标签
soup.select('div > p')

# 选择所有div后的兄弟p标签
soup.select('div ~ p')

# 选择紧接在div后的p标签
soup.select('div + p')

选择器对比与选择指南

选择方法 速度 灵活性 易用性 适用场景
find() / find_all() 中等 简单 简单选择,已知元素结构
select() / select_one() 中等 中等 复杂CSS选择器,熟悉CSS语法
属性选择器 中等 简单 基于属性筛选
正则表达式 复杂 模糊匹配,复杂模式
综合选择器示例
import re
from bs4 import BeautifulSoup

# 假设我们有以下HTML
html_doc = """
<div class="container">
  <article id="post-123" class="post">
    <h2 class="title">Python爬虫教程</h2>
    <div class="meta">
      <span class="author">张三</span>
      <span class="date">2023-10-01</span>
    </div>
    <div class="content">
      <p>这是一篇关于Python爬虫的教程。</p>
      <a href="https://example.com/more" class="read-more">阅读更多</a>
    </div>
    <div class="tags">
      <a href="/tag/python" class="tag">Python</a>
      <a href="/tag/web" class="tag">Web</a>
    </div>
  </article>
</div>
"""

soup = BeautifulSoup(html_doc, 'lxml')

# 1. 使用find()和find_all()
article = soup.find('article', class_='post')
title = article.find('h2', class_='title').text
author = article.find('span', class_='author').text

# 2. 使用CSS选择器
content = soup.select_one('.content p').text
read_more_link = soup.select_one('.read-more')['href']

# 3. 使用属性选择器
tags = soup.find_all('a', class_='tag', href=re.compile(r'^/tag/'))

# 4. 使用正则表达式
# 查找所有包含"Python"的文本
python_text = soup.find_all(text=re.compile('Python'))

# 5. 组合使用
# 查找article下的所有直接子div
article_divs = article.find_all('div', recursive=False)

# 提取数据
data = {
    'title': title,
    'author': author,
    'content': content,
    'read_more': read_more_link,
    'tags': [tag.text for tag in tags],
    'article_div_count': len(article_divs)
}

print("提取的数据:")
for key, value in data.items():
    print(f"{key}: {value}")

数据提取技巧

常见数据提取场景
1. 提取文本内容
# 获取元素所有文本
text = element.get_text()

# 获取文本并去除空白
clean_text = element.get_text(strip=True)

# 获取特定子元素的文本
title = element.find('h1').text
2. 提取属性值
# 获取单个属性
href = element['href']

# 获取所有属性
attrs = element.attrs

# 安全获取属性(避免KeyError)
href = element.get('href', '')
3. 提取链接和图片
# 提取所有链接
links = [a['href'] for a in soup.find_all('a', href=True)]

# 提取所有图片
images = [img['src'] for img in soup.find_all('img', src=True)]

# 处理相对链接
from urllib.parse import urljoin
full_url = urljoin(base_url, relative_url)
4. 提取表格数据
# 提取表格数据
table = soup.find('table')
rows = table.find_all('tr')
for row in rows:
    cols = row.find_all('td')
    row_data = [col.text.strip() for col in cols]
    # 处理row_data...

数据清洗与处理

数据清洗函数
import re
from datetime import datetime

class DataCleaner:
    """数据清洗工具类"""

    @staticmethod
    def clean_text(text):
        """清理文本:去除多余空白、换行符等"""
        if not text:
            return ""

        # 替换多个空白字符为单个空格
        text = re.sub(r'\s+', ' ', text)
        # 去除首尾空白
        text = text.strip()
        # 去除不可见字符
        text = ''.join(char for char in text if char.isprintable())

        return text

    @staticmethod
    def extract_price(text):
        """从文本中提取价格"""
        if not text:
            return None

        # 匹配数字,包括小数点和千分位分隔符
        match = re.search(r'[\d,]+\.?\d*', text)
        if match:
            # 去除千分位分隔符
            price_str = match.group().replace(',', '')
            try:
                return float(price_str)
            except ValueError:
                return None

        return None

    @staticmethod
    def extract_date(text, date_formats=None):
        """从文本中提取日期"""
        if not text:
            return None

        if date_formats is None:
            date_formats = [
                '%Y-%m-%d',
                '%Y/%m/%d',
                '%d-%m-%Y',
                '%d/%m/%Y',
                '%Y年%m月%d日',
                '%m月%d日, %Y',
            ]

        for date_format in date_formats:
            try:
                return datetime.strptime(text.strip(), date_format)
            except ValueError:
                continue

        # 尝试使用正则表达式匹配
        date_patterns = [
            r'(\d{4})[-/年](\d{1,2})[-/月](\d{1,2})日?',
            r'(\d{1,2})[-/月](\d{1,2})[-/日,]?\s*(\d{4})',
        ]

        for pattern in date_patterns:
            match = re.search(pattern, text)
            if match:
                groups = match.groups()
                if len(groups) == 3:
                    try:
                        year, month, day = map(int, groups)
                        # 处理两位数年份
                        if year < 100:
                            year += 2000
                        return datetime(year, month, day)
                    except ValueError:
                        continue

        return None

    @staticmethod
    def normalize_url(url, base_url):
        """规范化URL,处理相对链接"""
        from urllib.parse import urljoin, urlparse

        if not url:
            return None

        # 处理JavaScript链接
        if url.startswith(('javascript:', 'mailto:', 'tel:')):
            return None

        # 处理相对链接
        full_url = urljoin(base_url, url)

        # 解析URL以确保格式正确
        parsed = urlparse(full_url)

        # 重建URL(去除片段标识符等)
        normalized = parsed._replace(fragment='', params='', query='')

        return normalized.geturl()

    @staticmethod
    def extract_emails(text):
        """从文本中提取邮箱地址"""
        email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
        return re.findall(email_pattern, text)

    @staticmethod
    def extract_phone_numbers(text):
        """从文本中提取电话号码"""
        phone_patterns = [
            r'\+?[\d\s-]{10,}',  # 国际格式
            r'\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}',  # 美国格式
            r'1[3-9]\d{9}',  # 中国手机号
        ]

        phones = []
        for pattern in phone_patterns:
            phones.extend(re.findall(pattern, text))

        return phones

# 使用示例
if __name__ == "__main__":
    cleaner = DataCleaner()

    # 测试文本清洗
    dirty_text = "   Python  \n  爬虫  \t教程  "
    clean_text = cleaner.clean_text(dirty_text)
    print(f"文本清洗: '{dirty_text}' -> '{clean_text}'")

    # 测试价格提取
    price_text = "价格: ¥1,299.99"
    price = cleaner.extract_price(price_text)
    print(f"价格提取: '{price_text}' -> {price}")

    # 测试日期提取
    date_text = "发布日期: 2023-10-01"
    date = cleaner.extract_date(date_text)
    print(f"日期提取: '{date_text}' -> {date}")

处理嵌套结构和复杂数据

处理复杂HTML结构
import json
from bs4 import BeautifulSoup

def extract_nested_data(soup):
    """从复杂HTML结构中提取嵌套数据"""

    results = []

    # 假设HTML结构如下:
    # 
#
#
#

产品名称

#
# 价格 # 评分 #
#
    #
  • 特征1
  • #
  • 特征2
  • #
#
#
#
# 查找所有产品项 product_items = soup.select('.product-item') for item in product_items: product_data = {} # 提取基本信息 name_elem = item.select_one('h3') if name_elem: product_data['name'] = name_elem.get_text(strip=True) # 提取价格和评分 price_elem = item.select_one('.price') if price_elem: product_data['price'] = price_elem.get_text(strip=True) rating_elem = item.select_one('.rating') if rating_elem: product_data['rating'] = rating_elem.get_text(strip=True) # 提取特征列表 features = [] feature_items = item.select('.features li') for feature in feature_items: features.append(feature.get_text(strip=True)) product_data['features'] = features # 提取所有链接 links = [] for a in item.find_all('a', href=True): link_data = { 'text': a.get_text(strip=True), 'href': a['href'] } links.append(link_data) product_data['links'] = links # 提取所有图片 images = [] for img in item.find_all('img', src=True): img_data = { 'src': img['src'], 'alt': img.get('alt', '') } images.append(img_data) product_data['images'] = images # 提取自定义数据属性 # 假设有data-*属性 data_attrs = {} for attr_name, attr_value in item.attrs.items(): if attr_name.startswith('data-'): data_attrs[attr_name[5:]] = attr_value product_data['data_attributes'] = data_attrs results.append(product_data) return results def extract_table_data(soup): """提取表格数据并转换为结构化格式""" tables = soup.find_all('table') table_data = [] for table in tables: table_info = { 'headers': [], 'rows': [] } # 提取表头 headers = table.find_all('th') if headers: table_info['headers'] = [th.get_text(strip=True) for th in headers] # 提取表格行 rows = table.find_all('tr') for row in rows: # 跳过表头行(如果已经单独处理了th) if row.find('th'): continue cols = row.find_all('td') if cols: row_data = { 'cells': [td.get_text(strip=True) for td in cols], 'raw_html': str(row) } # 如果有表头,创建字典格式的行数据 if table_info['headers'] and len(cols) == len(table_info['headers']): row_dict = {} for i, header in enumerate(table_info['headers']): row_dict[header] = cols[i].get_text(strip=True) row_data['dict'] = row_dict table_info['rows'].append(row_data) table_data.append(table_info) return table_data # 使用示例 if __name__ == "__main__": # 模拟HTML html = """

智能手机

¥2999 4.5/5
  • 6.5英寸屏幕
  • 128GB存储
  • 三摄像头
购买 详情 智能手机
型号 价格 库存
iPhone 14 ¥6999 50
Samsung S23 ¥5999 30
""" soup = BeautifulSoup(html, 'lxml') # 提取嵌套数据 products = extract_nested_data(soup) print("产品数据:") print(json.dumps(products, indent=2, ensure_ascii=False)) # 提取表格数据 tables = extract_table_data(soup) print("\n表格数据:") print(json.dumps(tables, indent=2, ensure_ascii=False))

实际示例

示例1:新闻网站爬虫

1
分析目标网站

确定需要提取的数据:标题、发布时间、作者、内容、分类等。

2
编写爬虫代码
新闻爬虫示例
import requests
from bs4 import BeautifulSoup
import re
from datetime import datetime
import json

class NewsScraper:
    """新闻网站爬虫"""

    def __init__(self, base_url):
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })

    def fetch_article_links(self, category_url):
        """获取文章列表页的所有文章链接"""

        try:
            response = self.session.get(category_url, timeout=10)
            response.raise_for_status()
            soup = BeautifulSoup(response.text, 'lxml')

            # 根据实际网站结构调整选择器
            article_links = []

            # 方法1: 通过CSS选择器
            articles = soup.select('.article-list .article-item a')
            for article in articles:
                if article.get('href'):
                    link = article['href']
                    if not link.startswith('http'):
                        link = self.base_url + link
                    article_links.append(link)

            # 方法2: 通过正则表达式
            # 查找所有包含/article/的链接
            all_links = soup.find_all('a', href=re.compile(r'/article/\d+'))
            for link in all_links:
                href = link['href']
                if not href.startswith('http'):
                    href = self.base_url + href
                article_links.append(href)

            # 去重
            article_links = list(set(article_links))

            return article_links

        except Exception as e:
            print(f"获取文章链接失败: {e}")
            return []

    def parse_article(self, article_url):
        """解析单篇文章"""

        try:
            response = self.session.get(article_url, timeout=10)
            response.raise_for_status()
            soup = BeautifulSoup(response.text, 'lxml')

            article_data = {
                'url': article_url,
                'title': '',
                'author': '',
                'publish_time': '',
                'content': '',
                'category': '',
                'tags': []
            }

            # 提取标题(根据实际网站调整选择器)
            title_elem = soup.find('h1') or soup.find(class_='title') or soup.find(id='title')
            if title_elem:
                article_data['title'] = title_elem.get_text(strip=True)

            # 提取作者
            author_elem = soup.find(class_='author') or soup.find(attrs={'itemprop': 'author'})
            if author_elem:
                article_data['author'] = author_elem.get_text(strip=True)

            # 提取发布时间
            time_elem = soup.find(class_='publish-time') or soup.find('time')
            if time_elem:
                time_text = time_elem.get_text(strip=True)
                article_data['publish_time'] = self._parse_datetime(time_text)

            # 提取内容
            content_elem = soup.find(class_='content') or soup.find(attrs={'itemprop': 'articleBody'})
            if content_elem:
                # 清理内容,移除脚本、样式等
                for script in content_elem.find_all(['script', 'style']):
                    script.decompose()

                article_data['content'] = content_elem.get_text(strip=True)

            # 提取分类
            category_elem = soup.find(class_='category') or soup.find(attrs={'rel': 'category'})
            if category_elem:
                article_data['category'] = category_elem.get_text(strip=True)

            # 提取标签
            tags_container = soup.find(class_='tags') or soup.find(class_='keywords')
            if tags_container:
                tags = tags_container.find_all('a')
                article_data['tags'] = [tag.get_text(strip=True) for tag in tags]

            return article_data

        except Exception as e:
            print(f"解析文章失败 {article_url}: {e}")
            return None

    def _parse_datetime(self, time_text):
        """解析日期时间字符串"""

        date_patterns = [
            '%Y-%m-%d %H:%M:%S',
            '%Y-%m-%d %H:%M',
            '%Y年%m月%d日 %H:%M',
            '%Y/%m/%d %H:%M',
        ]

        for pattern in date_patterns:
            try:
                return datetime.strptime(time_text, pattern).isoformat()
            except ValueError:
                continue

        # 如果无法解析,返回原始文本
        return time_text

    def scrape_category(self, category_url, max_articles=10):
        """爬取整个分类的文章"""

        print(f"开始爬取分类: {category_url}")

        # 获取文章链接
        article_links = self.fetch_article_links(category_url)
        print(f"找到 {len(article_links)} 篇文章")

        # 限制爬取数量
        article_links = article_links[:max_articles]

        # 爬取每篇文章
        articles = []
        for i, link in enumerate(article_links, 1):
            print(f"正在爬取第 {i}/{len(article_links)} 篇文章: {link}")

            article_data = self.parse_article(link)
            if article_data:
                articles.append(article_data)

            # 避免请求过快
            import time
            time.sleep(1)

        return articles

# 使用示例
if __name__ == "__main__":
    # 创建爬虫实例
    scraper = NewsScraper('https://news.example.com')

    # 爬取新闻分类
    news_articles = scraper.scrape_category(
        'https://news.example.com/tech',
        max_articles=5
    )

    print(f"\n爬取完成,共获取 {len(news_articles)} 篇文章")

    # 保存数据
    if news_articles:
        with open('news_articles.json', 'w', encoding='utf-8') as f:
            json.dump(news_articles, f, indent=2, ensure_ascii=False)
        print("数据已保存到 news_articles.json")

        # 显示部分数据
        print("\n第一篇文章摘要:")
        first_article = news_articles[0]
        for key in ['title', 'author', 'publish_time']:
            print(f"{key}: {first_article.get(key, 'N/A')}")
3
数据可视化展示
标题: Python 3.11 发布,性能提升 25%
作者: 张三
发布时间: 2023-10-01 10:30:00
分类: 科技
标签: [Python, 编程, 技术]
内容摘要: Python 3.11 正式发布,带来了显著的性能改进...

示例2:电商网站价格监控

价格监控爬虫
import requests
from bs4 import BeautifulSoup
import re
import json
import csv
import time
from datetime import datetime
import smtplib
from email.mime.text import MIMEText

class PriceMonitor:
    """电商网站价格监控"""

    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept-Language': 'zh-CN,zh;q=0.9',
        })
        self.price_history = {}

    def monitor_product(self, product_url, selector_config):
        """监控单个商品价格"""

        try:
            response = self.session.get(product_url, timeout=10)
            response.raise_for_status()
            soup = BeautifulSoup(response.text, 'lxml')

            # 提取商品信息
            product_info = {}

            # 提取商品名称
            if 'name_selector' in selector_config:
                name_elem = soup.select_one(selector_config['name_selector'])
                if name_elem:
                    product_info['name'] = name_elem.get_text(strip=True)

            # 提取商品价格
            if 'price_selector' in selector_config:
                price_elem = soup.select_one(selector_config['price_selector'])
                if price_elem:
                    price_text = price_elem.get_text(strip=True)
                    product_info['price'] = self._extract_price_number(price_text)
                    product_info['price_text'] = price_text

            # 提取库存状态
            if 'stock_selector' in selector_config:
                stock_elem = soup.select_one(selector_config['stock_selector'])
                if stock_elem:
                    product_info['stock'] = stock_elem.get_text(strip=True)

            # 提取评分
            if 'rating_selector' in selector_config:
                rating_elem = soup.select_one(selector_config['rating_selector'])
                if rating_elem:
                    product_info['rating'] = rating_elem.get_text(strip=True)

            # 记录时间戳
            product_info['check_time'] = datetime.now().isoformat()
            product_info['url'] = product_url

            return product_info

        except Exception as e:
            print(f"监控商品失败 {product_url}: {e}")
            return None

    def _extract_price_number(self, price_text):
        """从价格文本中提取数字"""

        # 移除货币符号和千分位分隔符
        clean_text = re.sub(r'[^\d.,]', '', price_text)
        clean_text = clean_text.replace(',', '')

        try:
            # 尝试转换为浮点数
            return float(clean_text)
        except ValueError:
            return None

    def check_price_drop(self, product_url, current_price, threshold=0.9):
        """检查价格是否下降"""

        if product_url not in self.price_history:
            # 第一次检查,记录价格
            self.price_history[product_url] = {
                'last_price': current_price,
                'lowest_price': current_price,
                'price_changes': []
            }
            return False

        history = self.price_history[product_url]
        last_price = history['last_price']

        # 记录价格变化
        price_change = {
            'time': datetime.now().isoformat(),
            'price': current_price,
            'change': current_price - last_price if last_price else 0,
            'change_percent': ((current_price - last_price) / last_price * 100) if last_price else 0
        }

        history['price_changes'].append(price_change)
        history['last_price'] = current_price

        # 更新最低价格
        if current_price < history['lowest_price']:
            history['lowest_price'] = current_price

        # 检查价格下降是否超过阈值
        if last_price and current_price < last_price * threshold:
            return True

        return False

    def save_to_csv(self, product_info, filename='price_history.csv'):
        """保存价格历史到CSV文件"""

        fieldnames = ['check_time', 'name', 'price', 'price_text', 'stock', 'rating', 'url']

        # 如果文件不存在,创建并写入表头
        try:
            with open(filename, 'r', encoding='utf-8') as f:
                # 文件已存在
                pass
        except FileNotFoundError:
            with open(filename, 'w', encoding='utf-8', newline='') as f:
                writer = csv.DictWriter(f, fieldnames=fieldnames)
                writer.writeheader()

        # 追加数据
        with open(filename, 'a', encoding='utf-8', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writerow(product_info)

    def send_price_alert(self, product_info, old_price, new_price, recipient_email):
        """发送价格下降提醒邮件"""

        subject = f"价格提醒: {product_info['name']} 降价了!"

        message = f"""
        商品名称: {product_info['name']}
        商品链接: {product_info['url']}

        价格变化:
        原价: ¥{old_price}
        现价: ¥{new_price}
        降价: ¥{old_price - new_price} ({(old_price - new_price) / old_price * 100:.1f}%)

        检查时间: {product_info['check_time']}
        """

        # 发送邮件(需要配置SMTP)
        try:
            msg = MIMEText(message, 'plain', 'utf-8')
            msg['Subject'] = subject
            msg['From'] = 'price-monitor@example.com'
            msg['To'] = recipient_email

            # 使用SMTP发送邮件
            # with smtplib.SMTP('smtp.example.com', 587) as server:
            #     server.starttls()
            #     server.login('username', 'password')
            #     server.send_message(msg)

            print(f"价格提醒邮件已发送到 {recipient_email}")

        except Exception as e:
            print(f"发送邮件失败: {e}")

# 使用示例
if __name__ == "__main__":
    monitor = PriceMonitor()

    # 配置不同网站的商品选择器
    products_to_monitor = [
        {
            'url': 'https://www.example.com/product/123',
            'name': 'Python编程书',
            'selectors': {
                'name_selector': '.product-title',
                'price_selector': '.price-special',
                'stock_selector': '.stock-status',
                'rating_selector': '.rating-value'
            }
        },
        # 可以添加更多商品
    ]

    print("开始价格监控...")

    for product in products_to_monitor:
        print(f"监控商品: {product['name']}")

        # 获取当前价格
        product_info = monitor.monitor_product(
            product['url'],
            product['selectors']
        )

        if product_info and 'price' in product_info:
            print(f"当前价格: ¥{product_info['price']}")

            # 检查价格是否下降
            price_dropped = monitor.check_price_drop(
                product['url'],
                product_info['price'],
                threshold=0.95  # 价格下降5%时提醒
            )

            if price_dropped:
                print(f"价格下降! 发送提醒...")
                # 发送提醒邮件
                # monitor.send_price_alert(
                #     product_info,
                #     monitor.price_history[product['url']]['last_price'],
                #     product_info['price'],
                #     'user@example.com'
                # )

            # 保存到CSV
            monitor.save_to_csv(product_info)

        # 避免请求过快
        time.sleep(2)

    print("监控完成")

处理 JavaScript 渲染的页面

挑战:动态内容

许多现代网站使用JavaScript动态加载内容,Requests + BeautifulSoup无法直接获取这些动态生成的内容。

解决方案
方法1: 分析API请求

使用浏览器开发者工具分析页面加载的API请求,直接请求API获取数据。

  • 打开开发者工具(F12)
  • 转到Network标签
  • 刷新页面查看XHR/Fetch请求
  • 复制API请求进行模拟
方法2: 使用Selenium

使用Selenium控制真实浏览器,等待JavaScript执行完成后再获取页面内容。

  • 安装:pip install selenium
  • 需要浏览器驱动(ChromeDriver)
  • 可以模拟用户操作
  • 资源消耗大,速度慢
方法3: 使用Playwright

微软开发的浏览器自动化工具,比Selenium更现代。

  • 安装:pip install playwright
  • 自动下载浏览器
  • API更现代化
  • 支持多种浏览器
方法4: 使用requests-html

Kenneth Reitz开发的库,内置JavaScript执行引擎。

  • 安装:pip install requests-html
  • 基于pyppeteer
  • API类似Requests
  • 支持JavaScript渲染

使用Selenium + BeautifulSoup示例

Selenium与BeautifulSoup结合使用
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
import time

class JSPageScraper:
    """处理JavaScript页面的爬虫"""

    def __init__(self, headless=True):
        """初始化Selenium驱动"""

        options = webdriver.ChromeOptions()

        if headless:
            options.add_argument('--headless')  # 无头模式

        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')
        options.add_argument('--disable-gpu')
        options.add_argument('--window-size=1920,1080')

        # 可以添加更多选项
        options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')

        self.driver = webdriver.Chrome(options=options)
        self.wait = WebDriverWait(self.driver, 10)

    def load_page(self, url, wait_for=None):
        """加载页面并等待元素出现"""

        print(f"加载页面: {url}")
        self.driver.get(url)

        if wait_for:
            # 等待特定元素出现
            if isinstance(wait_for, str):
                # CSS选择器
                self.wait.until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, wait_for))
                )
            elif isinstance(wait_for, tuple):
                # (By, selector) 元组
                self.wait.until(
                    EC.presence_of_element_located(wait_for)
                )

        # 等待页面完全加载
        time.sleep(2)

    def get_page_source(self):
        """获取页面源代码(包含JavaScript渲染后的内容)"""

        return self.driver.page_source

    def parse_with_bs4(self):
        """使用BeautifulSoup解析当前页面"""

        html = self.get_page_source()
        return BeautifulSoup(html, 'lxml')

    def click_and_wait(self, selector, wait_selector=None):
        """点击元素并等待"""

        element = self.driver.find_element(By.CSS_SELECTOR, selector)
        element.click()

        if wait_selector:
            self.wait.until(
                EC.presence_of_element_located((By.CSS_SELECTOR, wait_selector))
            )

        time.sleep(1)

    def scroll_to_bottom(self):
        """滚动到页面底部(用于加载更多内容)"""

        # 获取当前页面高度
        last_height = self.driver.execute_script("return document.body.scrollHeight")

        while True:
            # 滚动到底部
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

            # 等待加载
            time.sleep(2)

            # 计算新的页面高度
            new_height = self.driver.execute_script("return document.body.scrollHeight")

            # 如果高度没有变化,停止滚动
            if new_height == last_height:
                break

            last_height = new_height

    def execute_js(self, script):
        """执行JavaScript代码"""

        return self.driver.execute_script(script)

    def extract_dynamic_content(self, url, content_selector):
        """提取动态加载的内容"""

        # 加载页面
        self.load_page(url, wait_for=content_selector)

        # 如果需要,滚动加载更多内容
        self.scroll_to_bottom()

        # 使用BeautifulSoup解析
        soup = self.parse_with_bs4()

        # 提取内容
        content_elements = soup.select(content_selector)

        results = []
        for element in content_elements:
            # 提取所需数据
            item_data = {
                'text': element.get_text(strip=True),
                'html': str(element),
                'attributes': element.attrs
            }
            results.append(item_data)

        return results

    def close(self):
        """关闭浏览器驱动"""

        self.driver.quit()

# 使用示例
if __name__ == "__main__":
    scraper = JSPageScraper(headless=True)

    try:
        # 加载一个使用JavaScript渲染的页面
        url = "https://example-spa.com/products"  # 假设这是一个单页面应用

        # 提取动态内容
        products = scraper.extract_dynamic_content(
            url,
            content_selector='.product-item'
        )

        print(f"提取到 {len(products)} 个产品")

        # 显示前几个产品
        for i, product in enumerate(products[:3], 1):
            print(f"\n产品 {i}:")
            print(f"文本: {product['text'][:100]}...")
            if 'class' in product['attributes']:
                print(f"类名: {product['attributes']['class']}")

        # 执行自定义JavaScript
        # 例如:获取页面标题
        page_title = scraper.execute_js("return document.title;")
        print(f"\n页面标题: {page_title}")

        # 获取页面中的链接数量
        link_count = scraper.execute_js("return document.querySelectorAll('a').length;")
        print(f"页面链接数量: {link_count}")

    finally:
        # 确保关闭浏览器
        scraper.close()

分析API请求示例

直接请求API获取数据
import requests
import json
import re

class APIScraper:
    """通过分析API请求获取数据"""

    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'application/json, text/plain, */*',
            'Accept-Language': 'zh-CN,zh;q=0.9',
            'Content-Type': 'application/json',
        })

    def find_api_endpoints(self, url):
        """分析页面,查找API端点"""

        try:
            # 首先获取页面HTML
            response = self.session.get(url)
            soup = BeautifulSoup(response.text, 'lxml')

            # 方法1: 查找包含API端点的script标签
            api_endpoints = []

            # 查找所有script标签
            for script in soup.find_all('script'):
                if script.string:
                    # 在JavaScript代码中查找API URL
                    # 常见的API URL模式
                    patterns = [
                        r'https?://[^"\']+?/api/[^"\']+',
                        r'https?://[^"\']+?/v\d+/[^"\']+',
                        r'fetch\(["\']([^"\']+)["\']\)',
                        r'axios\.(get|post|put|delete)\(["\']([^"\']+)["\']',
                        r'\.ajax\([^)]*url:\s*["\']([^"\']+)["\']',
                    ]

                    for pattern in patterns:
                        matches = re.findall(pattern, script.string, re.IGNORECASE)
                        for match in matches:
                            if isinstance(match, tuple):
                                # 对于有多个分组的正则表达式
                                api_url = match[1] if len(match) > 1 else match[0]
                            else:
                                api_url = match

                            if api_url not in api_endpoints:
                                api_endpoints.append(api_url)

            # 方法2: 查找data属性中的API信息
            data_attrs = soup.find_all(attrs={'data-api': True})
            for elem in data_attrs:
                api_url = elem['data-api']
                if api_url not in api_endpoints:
                    api_endpoints.append(api_url)

            return api_endpoints

        except Exception as e:
            print(f"查找API端点失败: {e}")
            return []

    def call_api(self, api_url, method='GET', params=None, data=None):
        """调用API接口"""

        try:
            if method.upper() == 'GET':
                response = self.session.get(api_url, params=params, timeout=10)
            elif method.upper() == 'POST':
                response = self.session.post(api_url, json=data, timeout=10)
            else:
                raise ValueError(f"不支持的HTTP方法: {method}")

            response.raise_for_status()

            # 尝试解析JSON响应
            try:
                return response.json()
            except ValueError:
                # 如果不是JSON,返回文本
                return response.text

        except Exception as e:
            print(f"调用API失败 {api_url}: {e}")
            return None

    def scrape_via_api(self, page_url, api_pattern=None):
        """通过API获取数据"""

        print(f"分析页面: {page_url}")

        # 查找API端点
        endpoints = self.find_api_endpoints(page_url)
        print(f"找到 {len(endpoints)} 个API端点")

        if api_pattern:
            # 筛选匹配特定模式的API
            endpoints = [ep for ep in endpoints if re.search(api_pattern, ep)]
            print(f"匹配模式的API端点: {len(endpoints)}")

        results = {}

        for endpoint in endpoints[:5]:  # 限制测试前5个
            print(f"\n测试API: {endpoint}")

            # 调用API
            api_data = self.call_api(endpoint)

            if api_data:
                results[endpoint] = api_data

                # 显示API响应摘要
                if isinstance(api_data, dict):
                    print(f"响应类型: JSON对象 (键: {list(api_data.keys())})")
                elif isinstance(api_data, list):
                    print(f"响应类型: JSON数组 (长度: {len(api_data)})")
                else:
                    print(f"响应类型: {type(api_data).__name__}")

        return results

# 使用示例
if __name__ == "__main__":
    scraper = APIScraper()

    # 示例:分析一个使用API加载数据的页面
    page_url = "https://jsonplaceholder.typicode.com/"  # 示例API网站

    # 通过API获取数据
    api_results = scraper.scrape_via_api(
        page_url,
        api_pattern=r'/posts'  # 只获取posts相关的API
    )

    if api_results:
        print(f"\n获取到 {len(api_results)} 个API的数据")

        # 保存第一个API的数据
        first_api = list(api_results.keys())[0]
        data = api_results[first_api]

        if isinstance(data, list) and len(data) > 0:
            print(f"\n第一个API的示例数据:")
            print(f"API端点: {first_api}")
            print(f"数据条数: {len(data)}")

            # 显示第一条数据
            first_item = data[0]
            print(f"\n第一条数据:")
            for key, value in list(first_item.items())[:5]:  # 只显示前5个字段
                print(f"  {key}: {value}")

            # 保存到文件
            with open('api_data.json', 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)
            print(f"\n数据已保存到 api_data.json")

最佳实践

尊重robots.txt

遵守网站的爬虫协议,避免爬取禁止的内容。

设置延迟

在请求之间添加延迟,避免给服务器造成过大压力。

import time
time.sleep(1)  # 延迟1秒
使用User-Agent

设置合理的User-Agent,模拟真实浏览器。

headers = {
    'User-Agent': '合理的UA'
}
错误处理

完善的错误处理机制,处理网络异常和解析错误。

try:
    response = requests.get(url)
except requests.exceptions.RequestException as e:
    print(f"请求失败: {e}")
数据持久化

及时保存爬取的数据,防止数据丢失。

# 保存到JSON
import json
with open('data.json', 'w') as f:
    json.dump(data, f)
会话重用

使用Session对象复用连接,提高性能。

session = requests.Session()
session.get(url1)
session.get(url2)  # 复用连接
生产环境爬虫框架示例
import requests
from bs4 import BeautifulSoup
import time
import json
import logging
from urllib.parse import urljoin, urlparse
from datetime import datetime
import random

class ProductionScraper:
    """生产环境级别的爬虫框架"""

    def __init__(self, config):
        self.config = config
        self.setup_logging()
        self.session = self.create_session()
        self.visited_urls = set()
        self.data_buffer = []

    def setup_logging(self):
        """配置日志"""

        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('scraper.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def create_session(self):
        """创建配置好的Session"""

        session = requests.Session()

        # 配置请求头
        headers = {
            'User-Agent': self.config.get('user_agent',
                'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
        }

        session.headers.update(headers)

        # 配置代理(如果需要)
        if 'proxy' in self.config:
            session.proxies.update(self.config['proxy'])

        return session

    def get_with_retry(self, url, max_retries=3):
        """带重试机制的GET请求"""

        for attempt in range(max_retries):
            try:
                # 随机延迟,避免模式化请求
                delay = random.uniform(self.config.get('min_delay', 1),
                                      self.config.get('max_delay', 3))
                time.sleep(delay)

                response = self.session.get(
                    url,
                    timeout=self.config.get('timeout', 30),
                    allow_redirects=True
                )

                response.raise_for_status()

                # 检查响应内容类型
                content_type = response.headers.get('content-type', '')
                if 'html' not in content_type.lower() and 'text' not in content_type.lower():
                    self.logger.warning(f"非HTML响应: {content_type}")

                return response

            except requests.exceptions.RequestException as e:
                self.logger.warning(f"请求失败 (尝试 {attempt+1}/{max_retries}): {e}")

                if attempt == max_retries - 1:
                    self.logger.error(f"所有重试都失败: {url}")
                    raise

                # 指数退避
                time.sleep(2 ** attempt)

    def parse_page(self, response):
        """解析页面内容"""

        try:
            # 设置编码
            response.encoding = response.apparent_encoding

            # 使用BeautifulSoup解析
            soup = BeautifulSoup(response.text, self.config.get('parser', 'lxml'))

            # 提取数据
            data = self.extract_data(soup, response.url)

            # 提取链接(用于进一步爬取)
            links = self.extract_links(soup, response.url)

            return {
                'data': data,
                'links': links,
                'url': response.url,
                'status_code': response.status_code,
                'timestamp': datetime.now().isoformat()
            }

        except Exception as e:
            self.logger.error(f"解析页面失败 {response.url}: {e}")
            return None

    def extract_data(self, soup, url):
        """提取数据(需要子类实现)"""

        # 这是一个示例实现,实际使用时需要根据具体网站重写
        data = {
            'title': soup.title.string if soup.title else '',
            'url': url,
            'text_length': len(soup.get_text()),
            'links_count': len(soup.find_all('a')),
        }

        return data

    def extract_links(self, soup, base_url):
        """提取页面中的链接"""

        links = []

        for a in soup.find_all('a', href=True):
            href = a['href']

            # 跳过非HTTP链接
            if href.startswith(('javascript:', 'mailto:', 'tel:', '#')):
                continue

            # 处理相对链接
            full_url = urljoin(base_url, href)

            # 规范化URL
            parsed = urlparse(full_url)
            normalized = parsed._replace(fragment='', params='')

            # 只保留特定域名的链接
            if self.config.get('allowed_domains'):
                if parsed.netloc not in self.config['allowed_domains']:
                    continue

            links.append(normalized.geturl())

        return links

    def save_data(self, data, filename=None):
        """保存数据"""

        if not filename:
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            filename = f'data_{timestamp}.json'

        try:
            with open(filename, 'a', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
                f.write('\n')

            self.logger.info(f"数据已保存到 {filename}")

        except Exception as e:
            self.logger.error(f"保存数据失败: {e}")

    def run(self, start_urls):
        """运行爬虫"""

        self.logger.info(f"开始爬取,初始URL: {len(start_urls)} 个")

        urls_to_visit = list(start_urls)
        max_pages = self.config.get('max_pages', 100)
        pages_crawled = 0

        while urls_to_visit and pages_crawled < max_pages:
            url = urls_to_visit.pop(0)

            # 检查是否已经访问过
            if url in self.visited_urls:
                continue

            self.logger.info(f"爬取 ({pages_crawled+1}/{max_pages}): {url}")

            try:
                # 获取页面
                response = self.get_with_retry(url)

                # 解析页面
                result = self.parse_page(response)

                if result:
                    # 保存数据
                    self.save_data(result['data'])

                    # 添加到已访问列表
                    self.visited_urls.add(url)

                    # 添加新链接到待访问列表
                    for link in result['links']:
                        if link not in self.visited_urls and link not in urls_to_visit:
                            urls_to_visit.append(link)

                    pages_crawled += 1

            except Exception as e:
                self.logger.error(f"处理URL失败 {url}: {e}")

        self.logger.info(f"爬取完成,共爬取 {pages_crawled} 个页面")

# 配置示例
config = {
    'user_agent': 'MyScraper/1.0 (+https://example.com/bot)',
    'parser': 'lxml',
    'timeout': 30,
    'min_delay': 1,
    'max_delay': 3,
    'max_pages': 50,
    'allowed_domains': ['example.com', 'www.example.com'],
}

# 使用示例
if __name__ == "__main__":
    scraper = ProductionScraper(config)

    # 开始爬取
    scraper.run(['https://example.com'])

完整项目:电影信息爬虫

1
项目目标

爬取电影网站的信息,包括电影名称、评分、导演、演员、简介等,并保存到数据库。

2
完整代码实现
电影信息爬虫完整代码
import requests
from bs4 import BeautifulSoup
import sqlite3
import json
import time
import re
from datetime import datetime
from urllib.parse import urljoin, urlparse
import logging

class MovieScraper:
    """电影信息爬虫"""

    def __init__(self, db_path='movies.db'):
        self.base_url = 'https://movie.example.com'  # 示例网站
        self.session = requests.Session()
        self.setup_session()
        self.setup_database(db_path)
        self.setup_logging()

        # 爬取统计
        self.stats = {
            'total_crawled': 0,
            'successful': 0,
            'failed': 0,
            'start_time': datetime.now()
        }

    def setup_logging(self):
        """配置日志"""

        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('movie_scraper.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def setup_session(self):
        """配置Session"""

        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Referer': 'https://www.google.com/',
        }

        self.session.headers.update(headers)

        # 设置超时
        self.session.request = lambda method, url, **kwargs: (
            self.session.request(method, url, timeout=(10, 30), **kwargs)
        )

    def setup_database(self, db_path):
        """设置数据库"""

        self.conn = sqlite3.connect(db_path)
        self.cursor = self.conn.cursor()

        # 创建电影表
        self.cursor.execute('''
        CREATE TABLE IF NOT EXISTS movies (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            title TEXT NOT NULL,
            original_title TEXT,
            year INTEGER,
            rating REAL,
            rating_count INTEGER,
            director TEXT,
            actors TEXT,  -- JSON数组
            genres TEXT,  -- JSON数组
            duration TEXT,
            release_date TEXT,
            country TEXT,
            language TEXT,
            summary TEXT,
            poster_url TEXT,
            detail_url TEXT UNIQUE,
            crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
        ''')

        # 创建爬取日志表
        self.cursor.execute('''
        CREATE TABLE IF NOT EXISTS crawl_log (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            url TEXT,
            status_code INTEGER,
            success BOOLEAN,
            error_message TEXT,
            crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
        ''')

        self.conn.commit()

    def fetch_movie_list(self, page=1):
        """获取电影列表页"""

        list_url = f'{self.base_url}/list?page={page}'

        try:
            response = self.session.get(list_url)
            response.raise_for_status()

            soup = BeautifulSoup(response.text, 'lxml')

            # 提取电影链接
            movie_links = []

            # 根据实际网站结构调整选择器
            movie_items = soup.select('.movie-item')

            for item in movie_items:
                link_elem = item.select_one('a.movie-link')
                if link_elem and link_elem.get('href'):
                    movie_url = urljoin(self.base_url, link_elem['href'])
                    movie_links.append(movie_url)

            # 如果没有找到特定选择器的元素,尝试其他方法
            if not movie_links:
                # 查找所有包含/movie/的链接
                all_links = soup.find_all('a', href=re.compile(r'/movie/\d+'))
                for link in all_links:
                    movie_url = urljoin(self.base_url, link['href'])
                    movie_links.append(movie_url)

            return movie_links

        except Exception as e:
            self.logger.error(f"获取电影列表失败 (第{page}页): {e}")
            return []

    def parse_movie_detail(self, movie_url):
        """解析电影详情页"""

        try:
            response = self.session.get(movie_url)
            response.raise_for_status()

            soup = BeautifulSoup(response.text, 'lxml')

            movie_data = {
                'detail_url': movie_url,
                'title': '',
                'original_title': '',
                'year': None,
                'rating': None,
                'rating_count': None,
                'director': '',
                'actors': [],
                'genres': [],
                'duration': '',
                'release_date': '',
                'country': '',
                'language': '',
                'summary': '',
                'poster_url': ''
            }

            # 提取电影标题
            title_elem = soup.select_one('h1.movie-title')
            if title_elem:
                movie_data['title'] = title_elem.get_text(strip=True)

            # 提取原始标题
            original_elem = soup.select_one('.original-title')
            if original_elem:
                movie_data['original_title'] = original_elem.get_text(strip=True)

            # 提取年份
            year_match = re.search(r'(\d{4})', movie_data.get('title', ''))
            if year_match:
                movie_data['year'] = int(year_match.group(1))

            # 提取评分
            rating_elem = soup.select_one('.rating-value')
            if rating_elem:
                try:
                    movie_data['rating'] = float(rating_elem.get_text(strip=True))
                except ValueError:
                    pass

            # 提取评分人数
            rating_count_elem = soup.select_one('.rating-count')
            if rating_count_elem:
                count_text = rating_count_elem.get_text(strip=True)
                count_match = re.search(r'(\d+)', count_text.replace(',', ''))
                if count_match:
                    movie_data['rating_count'] = int(count_match.group(1))

            # 提取导演
            director_elem = soup.select_one('.director')
            if director_elem:
                movie_data['director'] = director_elem.get_text(strip=True)

            # 提取演员
            actor_elems = soup.select('.actor-list .actor')
            for actor_elem in actor_elems:
                actor_name = actor_elem.get_text(strip=True)
                if actor_name:
                    movie_data['actors'].append(actor_name)

            # 提取类型
            genre_elems = soup.select('.genre-tag')
            for genre_elem in genre_elems:
                genre = genre_elem.get_text(strip=True)
                if genre:
                    movie_data['genres'].append(genre)

            # 提取时长
            duration_elem = soup.select_one('.duration')
            if duration_elem:
                movie_data['duration'] = duration_elem.get_text(strip=True)

            # 提取上映日期
            release_elem = soup.select_one('.release-date')
            if release_elem:
                movie_data['release_date'] = release_elem.get_text(strip=True)

            # 提取国家/地区
            country_elem = soup.select_one('.country')
            if country_elem:
                movie_data['country'] = country_elem.get_text(strip=True)

            # 提取语言
            language_elem = soup.select_one('.language')
            if language_elem:
                movie_data['language'] = language_elem.get_text(strip=True)

            # 提取简介
            summary_elem = soup.select_one('.summary')
            if summary_elem:
                movie_data['summary'] = summary_elem.get_text(strip=True)

            # 提取海报URL
            poster_elem = soup.select_one('.movie-poster img')
            if poster_elem and poster_elem.get('src'):
                poster_url = poster_elem['src']
                if not poster_url.startswith('http'):
                    poster_url = urljoin(self.base_url, poster_url)
                movie_data['poster_url'] = poster_url

            return movie_data

        except Exception as e:
            self.logger.error(f"解析电影详情失败 {movie_url}: {e}")

            # 记录错误到数据库
            self.log_crawl_result(movie_url, None, False, str(e))

            return None

    def save_movie_to_db(self, movie_data):
        """保存电影数据到数据库"""

        try:
            # 转换列表为JSON字符串
            actors_json = json.dumps(movie_data.get('actors', []), ensure_ascii=False)
            genres_json = json.dumps(movie_data.get('genres', []), ensure_ascii=False)

            # 检查电影是否已存在
            self.cursor.execute(
                'SELECT id FROM movies WHERE detail_url = ?',
                (movie_data['detail_url'],)
            )

            if self.cursor.fetchone():
                # 更新现有记录
                self.cursor.execute('''
                UPDATE movies SET
                    title = ?, original_title = ?, year = ?, rating = ?,
                    rating_count = ?, director = ?, actors = ?, genres = ?,
                    duration = ?, release_date = ?, country = ?,
                    language = ?, summary = ?, poster_url = ?
                WHERE detail_url = ?
                ''', (
                    movie_data.get('title'),
                    movie_data.get('original_title'),
                    movie_data.get('year'),
                    movie_data.get('rating'),
                    movie_data.get('rating_count'),
                    movie_data.get('director'),
                    actors_json,
                    genres_json,
                    movie_data.get('duration'),
                    movie_data.get('release_date'),
                    movie_data.get('country'),
                    movie_data.get('language'),
                    movie_data.get('summary'),
                    movie_data.get('poster_url'),
                    movie_data['detail_url']
                ))
            else:
                # 插入新记录
                self.cursor.execute('''
                INSERT INTO movies (
                    title, original_title, year, rating, rating_count,
                    director, actors, genres, duration, release_date,
                    country, language, summary, poster_url, detail_url
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                ''', (
                    movie_data.get('title'),
                    movie_data.get('original_title'),
                    movie_data.get('year'),
                    movie_data.get('rating'),
                    movie_data.get('rating_count'),
                    movie_data.get('director'),
                    actors_json,
                    genres_json,
                    movie_data.get('duration'),
                    movie_data.get('release_date'),
                    movie_data.get('country'),
                    movie_data.get('language'),
                    movie_data.get('summary'),
                    movie_data.get('poster_url'),
                    movie_data['detail_url']
                ))

            self.conn.commit()

            # 记录成功的爬取
            self.log_crawl_result(
                movie_data['detail_url'],
                200,
                True,
                None
            )

            self.stats['successful'] += 1
            return True

        except Exception as e:
            self.logger.error(f"保存电影数据失败: {e}")

            # 记录失败的爬取
            self.log_crawl_result(
                movie_data.get('detail_url', 'unknown'),
                None,
                False,
                str(e)
            )

            self.stats['failed'] += 1
            return False

    def log_crawl_result(self, url, status_code, success, error_message):
        """记录爬取结果到数据库"""

        try:
            self.cursor.execute('''
            INSERT INTO crawl_log (url, status_code, success, error_message)
            VALUES (?, ?, ?, ?)
            ''', (url, status_code, success, error_message))

            self.conn.commit()
        except Exception as e:
            self.logger.error(f"记录爬取日志失败: {e}")

    def crawl_movies(self, start_page=1, max_pages=10, delay=2):
        """爬取电影数据"""

        self.logger.info(f"开始爬取电影数据,从第{start_page}页开始,最多{max_pages}页")

        for page in range(start_page, start_page + max_pages):
            self.logger.info(f"正在爬取第{page}页...")

            # 获取电影列表
            movie_links = self.fetch_movie_list(page)

            if not movie_links:
                self.logger.warning(f"第{page}页没有找到电影链接")
                break

            self.logger.info(f"第{page}页找到 {len(movie_links)} 部电影")

            # 爬取每部电影的详情
            for i, movie_url in enumerate(movie_links, 1):
                self.stats['total_crawled'] += 1

                self.logger.info(f"正在爬取电影 {i}/{len(movie_links)}: {movie_url}")

                # 解析电影详情
                movie_data = self.parse_movie_detail(movie_url)

                if movie_data:
                    # 保存到数据库
                    success = self.save_movie_to_db(movie_data)

                    if success:
                        self.logger.info(f"成功保存: {movie_data.get('title', '未知标题')}")
                    else:
                        self.logger.error(f"保存失败: {movie_url}")
                else:
                    self.logger.error(f"解析失败: {movie_url}")
                    self.stats['failed'] += 1

                # 延迟,避免请求过快
                time.sleep(delay)

            # 页面间延迟
            time.sleep(delay * 2)

        # 打印统计信息
        self.print_stats()

    def print_stats(self):
        """打印爬取统计信息"""

        elapsed = datetime.now() - self.stats['start_time']

        print("\n" + "="*50)
        print("爬取统计")
        print("="*50)
        print(f"开始时间: {self.stats['start_time']}")
        print(f"总耗时: {elapsed}")
        print(f"尝试爬取总数: {self.stats['total_crawled']}")
        print(f"成功数: {self.stats['successful']}")
        print(f"失败数: {self.stats['failed']}")

        if self.stats['total_crawled'] > 0:
            success_rate = self.stats['successful'] / self.stats['total_crawled'] * 100
            print(f"成功率: {success_rate:.1f}%")

        # 从数据库获取更多统计
        self.cursor.execute('SELECT COUNT(*) FROM movies')
        total_movies = self.cursor.fetchone()[0]
        print(f"数据库中的电影总数: {total_movies}")

        self.cursor.execute('SELECT COUNT(DISTINCT url) FROM crawl_log WHERE success = 1')
        successful_crawls = self.cursor.fetchone()[0]
        print(f"成功爬取的唯一URL数: {successful_crawls}")

    def export_to_json(self, filename='movies_export.json'):
        """导出数据到JSON文件"""

        try:
            self.cursor.execute('SELECT * FROM movies')
            movies = self.cursor.fetchall()

            # 获取列名
            column_names = [description[0] for description in self.cursor.description]

            # 转换为字典列表
            movies_dict = []
            for movie in movies:
                movie_dict = dict(zip(column_names, movie))

                # 解析JSON字段
                if movie_dict.get('actors'):
                    movie_dict['actors'] = json.loads(movie_dict['actors'])
                if movie_dict.get('genres'):
                    movie_dict['genres'] = json.loads(movie_dict['genres'])

                movies_dict.append(movie_dict)

            # 保存到文件
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(movies_dict, f, ensure_ascii=False, indent=2)

            self.logger.info(f"已导出 {len(movies_dict)} 部电影到 {filename}")

        except Exception as e:
            self.logger.error(f"导出数据失败: {e}")

    def close(self):
        """关闭数据库连接"""

        self.conn.close()
        self.logger.info("数据库连接已关闭")

# 使用示例
if __name__ == "__main__":
    # 创建爬虫实例
    scraper = MovieScraper(db_path='movies_data.db')

    try:
        # 开始爬取
        scraper.crawl_movies(
            start_page=1,
            max_pages=3,  # 爬取前3页
            delay=1  # 1秒延迟
        )

        # 导出数据
        scraper.export_to_json('movies.json')

        # 显示示例数据
        scraper.cursor.execute('SELECT title, rating, director FROM movies LIMIT 5')
        sample_movies = scraper.cursor.fetchall()

        print("\n示例数据:")
        for movie in sample_movies:
            print(f"标题: {movie[0]}, 评分: {movie[1]}, 导演: {movie[2]}")

    finally:
        # 确保关闭连接
        scraper.close()
3
项目总结
项目名称: 电影信息爬虫
技术栈: Requests + BeautifulSoup + SQLite
功能特点: 自动爬取、数据清洗、数据库存储、错误处理、日志记录
数据字段: 15+ 个字段,包括评分、演员、类型等
输出格式: SQLite数据库 + JSON导出
扩展性: 支持多线程、分布式、定时任务

总结

Requests 结合 BeautifulSoup 是 Python 网络爬虫和数据抓取的黄金组合。关键要点:

  • 分工明确: Requests 负责网络通信,BeautifulSoup 负责 HTML 解析
  • 灵活的选择器: 支持多种方式查找和提取元素
  • 强大的数据处理: 可以处理文本、属性、嵌套结构等复杂数据
  • 应对各种场景: 从简单静态页面到复杂动态页面
  • 生产就绪: 通过合理的设计可以构建企业级爬虫应用

掌握这个组合,你将能够从互联网上获取和处理各种数据,为数据分析、机器学习、市场研究等应用提供数据支持。