BeautifulSoup与requests库配合使用

BeautifulSoup 负责解析 HTML,而 requests 库负责获取网页内容。两者结合是 Python 网页爬虫的标准配置。本章将详细介绍如何配合使用这两个强大的库,构建完整的网页抓取和解析流程。

黄金组合:requests + BeautifulSoup 是 Python 网页爬虫最常用的组合,requests 负责网络请求,BeautifulSoup 负责数据解析。

requests库基础

在开始配合使用前,先了解 requests 库的基本用法。

安装requests

pip install requests

基本GET请求

import requests

# 发送GET请求
response = requests.get('https://httpbin.org/get')

# 检查状态码
print(f"状态码: {response.status_code}")
print(f"响应头: {response.headers['content-type']}")
print(f"编码: {response.encoding}")
print(f"内容长度: {len(response.content)} bytes")

1. 基础配合使用

1.1 最简单的配合示例

import requests
from bs4 import BeautifulSoup

def get_page_title(url):
    """获取网页标题"""
    try:
        # 发送请求
        response = requests.get(url)
        response.raise_for_status()  # 检查请求是否成功

        # 解析HTML
        soup = BeautifulSoup(response.content, 'lxml')

        # 提取标题
        title = soup.title.string if soup.title else "无标题"
        return title

    except requests.exceptions.RequestException as e:
        return f"请求错误: {e}"
    except Exception as e:
        return f"解析错误: {e}"

# 使用示例
url = 'https://httpbin.org/html'
title = get_page_title(url)
print(f"页面标题: {title}")

1.2 获取并解析完整页面

import requests
from bs4 import BeautifulSoup

def fetch_and_parse(url):
    """获取并解析网页"""
    try:
        # 发送请求
        print(f"正在获取: {url}")
        response = requests.get(url)

        # 检查响应
        if response.status_code != 200:
            print(f"请求失败,状态码: {response.status_code}")
            return None

        # 自动检测编码
        response.encoding = response.apparent_encoding

        # 使用BeautifulSoup解析
        soup = BeautifulSoup(response.text, 'lxml')

        # 提取基本信息
        info = {
            'url': url,
            'status_code': response.status_code,
            'encoding': response.encoding,
            'title': soup.title.string if soup.title else None,
            'meta_description': '',
            'h1_count': len(soup.find_all('h1')),
            'link_count': len(soup.find_all('a')),
            'image_count': len(soup.find_all('img'))
        }

        # 提取meta描述
        meta_desc = soup.find('meta', attrs={'name': 'description'})
        if meta_desc:
            info['meta_description'] = meta_desc.get('content', '')

        return info

    except Exception as e:
        print(f"错误: {e}")
        return None

# 使用示例
info = fetch_and_parse('https://httpbin.org/html')
if info:
    for key, value in info.items():
        print(f"{key}: {value}")

2. 处理HTTP请求参数

2.1 查询参数

import requests
from bs4 import BeautifulSoup

# 方法1:手动构造URL
base_url = 'https://httpbin.org/get'
params = {'key1': 'value1', 'key2': 'value2'}
url_with_params = f"{base_url}?key1=value1&key2=value2"

# 方法2:使用params参数(推荐)
response = requests.get('https://httpbin.org/get', params=params)

# 解析响应
soup = BeautifulSoup(response.content, 'html.parser')
print(f"响应URL: {response.url}")
print(f"查询参数: {soup.prettify()[:500]}...")

# 实际搜索示例
def google_search(query):
    """模拟Google搜索"""
    url = 'https://www.google.com/search'
    params = {
        'q': query,
        'hl': 'zh-CN',
        'num': 10  # 结果数量
    }

    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    }

    response = requests.get(url, params=params, headers=headers)

    if response.status_code == 200:
        soup = BeautifulSoup(response.text, 'lxml')
        # 提取搜索结果标题
        titles = []
        for h3 in soup.find_all('h3'):
            if h3.parent and h3.parent.get('href'):
                titles.append(h3.text)
        return titles
    return []

# 注意:实际使用Google搜索需要处理反爬虫机制

2.2 POST请求和数据提交

import requests
from bs4 import BeautifulSoup
import json

# 示例1:发送表单数据
def submit_form():
    """提交表单数据"""
    url = 'https://httpbin.org/post'

    # 表单数据
    data = {
        'username': 'testuser',
        'password': 'testpass',
        'email': 'test@example.com'
    }

    # 发送POST请求
    response = requests.post(url, data=data)

    # 解析响应
    if response.status_code == 200:
        soup = BeautifulSoup(response.content, 'html.parser')
        result = json.loads(soup.text)
        print(f"表单数据: {result['form']}")
        return result
    return None

# 示例2:发送JSON数据
def send_json_data():
    """发送JSON数据"""
    url = 'https://httpbin.org/post'

    # JSON数据
    json_data = {
        'name': '张三',
        'age': 25,
        'skills': ['Python', 'JavaScript', 'HTML/CSS']
    }

    headers = {'Content-Type': 'application/json'}

    # 发送POST请求
    response = requests.post(url, json=json_data, headers=headers)

    if response.status_code == 200:
        soup = BeautifulSoup(response.content, 'html.parser')
        result = json.loads(soup.text)
        print(f"JSON数据: {result['json']}")
        return result
    return None

# 运行示例
print("=== 表单提交 ===")
submit_form()

print("\n=== JSON提交 ===")
send_json_data()

3. 请求头设置与伪装

3.1 设置请求头

import requests
from bs4 import BeautifulSoup

def get_with_headers(url):
    """使用自定义请求头获取网页"""
    # 常见请求头
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
        'Cache-Control': 'max-age=0'
    }

    try:
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()

        # 解析页面
        soup = BeautifulSoup(response.content, 'lxml')

        # 提取信息
        info = {
            'title': soup.title.string if soup.title else '无标题',
            'final_url': response.url,
            'headers_sent': dict(response.request.headers),
            'headers_received': dict(response.headers)
        }

        return info

    except requests.exceptions.RequestException as e:
        print(f"请求失败: {e}")
        return None

# 使用示例
info = get_with_headers('https://httpbin.org/headers')
if info:
    print("请求成功")
    print(f"页面标题: {info['title']}")
    print(f"最终URL: {info['final_url']}")
    print("\n发送的请求头:")
    for key, value in info['headers_sent'].items():
        print(f"  {key}: {value}")

3.2 随机User-Agent

import requests
from bs4 import BeautifulSoup
import random

def get_random_user_agent():
    """生成随机User-Agent"""
    user_agents = [
        # Chrome
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
        # Firefox
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:120.0) Gecko/20100101 Firefox/120.0',
        # Safari
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15',
        # Edge
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0'
    ]
    return random.choice(user_agents)

def fetch_with_random_ua(url):
    """使用随机User-Agent获取页面"""
    headers = {
        'User-Agent': get_random_user_agent(),
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
    }

    try:
        response = requests.get(url, headers=headers, timeout=10)

        if response.status_code == 200:
            soup = BeautifulSoup(response.content, 'lxml')

            # 记录使用的User-Agent
            print(f"使用的User-Agent: {headers['User-Agent'][:50]}...")
            print(f"页面标题: {soup.title.string if soup.title else '无标题'}")

            return soup
        else:
            print(f"请求失败,状态码: {response.status_code}")
            return None

    except Exception as e:
        print(f"错误: {e}")
        return None

# 使用示例
soup = fetch_with_random_ua('https://httpbin.org/user-agent')
if soup:
    # 提取用户代理信息
    pre_tag = soup.find('pre')
    if pre_tag:
        print(f"服务器收到的User-Agent: {pre_tag.text[:100]}...")

4. 会话管理与Cookies

4.1 使用Session保持会话

import requests
from bs4 import BeautifulSoup

def simulate_login():
    """模拟登录过程"""
    # 创建Session对象
    session = requests.Session()

    # 第一次请求:获取登录页面
    login_url = 'https://httpbin.org/cookies/set/sessionid/123456'
    print("1. 访问登录页面...")
    response = session.get(login_url)

    # 解析登录页面(示例)
    soup = BeautifulSoup(response.text, 'lxml')
    print(f"  页面标题: {soup.title.string if soup.title else '登录页面'}")

    # 检查Cookies
    print(f"  Cookies: {session.cookies.get_dict()}")

    # 第二次请求:访问需要登录的页面
    protected_url = 'https://httpbin.org/cookies'
    print("\n2. 访问需要登录的页面...")
    response2 = session.get(protected_url)

    # 解析响应
    soup2 = BeautifulSoup(response2.text, 'lxml')
    pre_tag = soup2.find('pre')
    if pre_tag:
        print(f"  返回的Cookies信息: {pre_tag.text}")

    # 第三次请求:登出
    logout_url = 'https://httpbin.org/cookies/delete?sessionid'
    print("\n3. 登出...")
    response3 = session.get(logout_url)
    soup3 = BeautifulSoup(response3.text, 'lxml')
    pre_tag3 = soup3.find('pre')
    if pre_tag3:
        print(f"  登出后的Cookies: {pre_tag3.text}")

    return session

# 运行示例
session = simulate_login()
print(f"\n最终的Cookies: {session.cookies.get_dict()}")

4.2 手动管理Cookies

import requests
from bs4 import BeautifulSoup

def manage_cookies_manually():
    """手动管理Cookies"""
    # 第一次请求:设置Cookies
    print("1. 设置Cookies...")
    response1 = requests.get('https://httpbin.org/cookies/set/name/value')

    # 获取服务器设置的Cookies
    cookies_from_server = response1.cookies
    print(f"  服务器设置的Cookies: {dict(cookies_from_server)}")

    # 第二次请求:发送特定的Cookies
    print("\n2. 发送自定义Cookies...")
    cookies_to_send = {
        'session_id': 'abc123',
        'user_id': '456',
        'preferences': 'dark_mode'
    }

    response2 = requests.get(
        'https://httpbin.org/cookies',
        cookies=cookies_to_send
    )

    # 解析响应
    soup = BeautifulSoup(response2.text, 'lxml')
    pre_tag = soup.find('pre')
    if pre_tag:
        print(f"  服务器收到的Cookies: {pre_tag.text}")

    # 第三次请求:结合Session和手动Cookies
    print("\n3. 使用Session并更新Cookies...")
    session = requests.Session()

    # 设置初始Cookies
    session.cookies.update(cookies_to_send)

    # 发送请求
    response3 = session.get('https://httpbin.org/cookies')
    soup3 = BeautifulSoup(response3.text, 'lxml')
    pre_tag3 = soup3.find('pre')
    if pre_tag3:
        print(f"  Session Cookies: {pre_tag3.text}")

    # 更新Cookies
    print("\n4. 更新Cookies...")
    new_cookies = {'new_cookie': 'new_value'}
    session.cookies.update(new_cookies)

    response4 = session.get('https://httpbin.org/cookies')
    soup4 = BeautifulSoup(response4.text, 'lxml')
    pre_tag4 = soup4.find('pre')
    if pre_tag4:
        print(f"  更新后的Cookies: {pre_tag4.text}")

    return session

# 运行示例
session = manage_cookies_manually()

5. 处理响应和错误

5.1 响应处理与重试

import requests
from bs4 import BeautifulSoup
import time

def fetch_with_retry(url, max_retries=3, delay=2):
    """带重试机制的网页获取"""
    for attempt in range(max_retries):
        try:
            print(f"第{attempt+1}次尝试获取: {url}")

            response = requests.get(
                url,
                timeout=10,
                headers={'User-Agent': 'Mozilla/5.0'}
            )

            # 检查状态码
            if response.status_code == 200:
                soup = BeautifulSoup(response.content, 'lxml')
                print("请求成功")
                return soup

            elif response.status_code == 404:
                print("页面不存在")
                return None

            elif response.status_code == 403:
                print("访问被拒绝")
                time.sleep(delay * 2)  # 403错误等待更长时间
                continue

            elif response.status_code == 429:
                print("请求过于频繁,等待后重试")
                time.sleep(delay * 3)
                continue

            elif response.status_code >= 500:
                print(f"服务器错误: {response.status_code}")
                time.sleep(delay)
                continue

            else:
                print(f"未知状态码: {response.status_code}")
                time.sleep(delay)
                continue

        except requests.exceptions.Timeout:
            print("请求超时")
            time.sleep(delay)

        except requests.exceptions.ConnectionError:
            print("连接错误")
            time.sleep(delay)

        except Exception as e:
            print(f"其他错误: {e}")
            time.sleep(delay)

    print(f"经过{max_retries}次尝试后仍失败")
    return None

# 使用示例
soup = fetch_with_retry('https://httpbin.org/status/200')
if soup:
    print(f"页面标题: {soup.title.string if soup.title else '无标题'}")

5.2 处理不同的响应格式

import requests
from bs4 import BeautifulSoup
import json

def handle_different_responses(url):
    """处理不同类型的响应"""
    try:
        response = requests.get(url, timeout=10)

        # 获取内容类型
        content_type = response.headers.get('content-type', '').lower()

        print(f"URL: {url}")
        print(f"状态码: {response.status_code}")
        print(f"内容类型: {content_type}")

        # 根据内容类型处理
        if 'text/html' in content_type:
            # HTML响应
            soup = BeautifulSoup(response.content, 'lxml')
            title = soup.title.string if soup.title else '无标题'
            print(f"HTML页面标题: {title}")

            # 提取有用信息
            links = []
            for a in soup.find_all('a', href=True):
                links.append({
                    'text': a.text.strip()[:50],
                    'href': a['href']
                })

            return {
                'type': 'html',
                'title': title,
                'links': links[:5]  # 只返回前5个链接
            }

        elif 'application/json' in content_type:
            # JSON响应
            try:
                data = response.json()
                print(f"JSON数据示例: {json.dumps(data, ensure_ascii=False)[:200]}...")
                return {'type': 'json', 'data': data}
            except:
                # 如果JSON解析失败,尝试其他方式
                pass

        elif 'text/plain' in content_type:
            # 纯文本响应
            text = response.text[:200] + '...' if len(response.text) > 200 else response.text
            print(f"纯文本内容: {text}")
            return {'type': 'text', 'content': response.text}

        elif 'image/' in content_type:
            # 图片响应
            print(f"图片大小: {len(response.content)} bytes")
            return {'type': 'image', 'size': len(response.content)}

        else:
            # 其他类型
            print(f"二进制数据,大小: {len(response.content)} bytes")
            return {'type': 'binary', 'size': len(response.content)}

    except Exception as e:
        print(f"错误: {e}")
        return {'type': 'error', 'message': str(e)}

# 测试不同类型的响应
test_urls = [
    'https://httpbin.org/html',          # HTML
    'https://httpbin.org/json',          # JSON
    'https://httpbin.org/robots.txt',    # 纯文本
    'https://httpbin.org/image/png',     # 图片
]

for url in test_urls:
    print("\n" + "="*50)
    result = handle_different_responses(url)
    print(f"处理结果类型: {result['type']}")

6. 高级技巧与实战

6.1 分页数据抓取

import requests
from bs4 import BeautifulSoup
import time

def scrape_paginated_data(base_url, max_pages=5):
    """抓取分页数据"""
    all_data = []

    for page in range(1, max_pages + 1):
        print(f"正在抓取第 {page} 页...")

        # 构建分页URL
        if '?' in base_url:
            url = f"{base_url}&page={page}"
        else:
            url = f"{base_url}?page={page}"

        try:
            # 发送请求
            response = requests.get(
                url,
                headers={'User-Agent': 'Mozilla/5.0'},
                timeout=10
            )

            if response.status_code != 200:
                print(f"第 {page} 页请求失败,状态码: {response.status_code}")
                break

            # 解析页面
            soup = BeautifulSoup(response.content, 'lxml')

            # 示例:提取文章列表(根据实际页面结构调整)
            articles = soup.find_all('article') or soup.find_all('div', class_='item')

            if not articles:
                print(f"第 {page} 页没有找到数据")
                break

            page_data = []
            for article in articles:
                # 提取文章信息
                title_elem = article.find('h2') or article.find('h3')
                title = title_elem.text.strip() if title_elem else '无标题'

                link_elem = article.find('a')
                link = link_elem['href'] if link_elem and link_elem.has_attr('href') else ''

                content_elem = article.find('p')
                content = content_elem.text.strip() if content_elem else ''

                item = {
                    'title': title,
                    'link': link,
                    'content': content[:100] + '...' if len(content) > 100 else content,
                    'page': page
                }
                page_data.append(item)

            all_data.extend(page_data)
            print(f"第 {page} 页找到 {len(page_data)} 条数据")

            # 检查是否还有下一页
            next_button = soup.find('a', string=lambda x: x and '下一页' in x)
            if not next_button:
                next_button = soup.find('a', class_='next')

            if not next_button:
                print("没有找到下一页按钮,停止抓取")
                break

            # 避免请求过快
            time.sleep(1)

        except Exception as e:
            print(f"第 {page} 页抓取出错: {e}")
            break

    print(f"\n总共抓取 {len(all_data)} 条数据")
    return all_data

# 使用示例(注意:这是一个通用示例,实际URL需要替换)
# data = scrape_paginated_data('https://example.com/articles', max_pages=3)
# for item in data[:3]:  # 显示前3条
#     print(f"标题: {item['title']}, 页码: {item['page']}")

6.2 并发请求与解析

import requests
from bs4 import BeautifulSoup
import concurrent.futures
import time

def fetch_single_page(url):
    """获取单个页面"""
    try:
        response = requests.get(url, timeout=10, headers={'User-Agent': 'Mozilla/5.0'})
        if response.status_code == 200:
            return response.content
        else:
            print(f"请求失败: {url}, 状态码: {response.status_code}")
            return None
    except Exception as e:
        print(f"请求错误: {url}, 错误: {e}")
        return None

def parse_page_content(html_content, url):
    """解析页面内容"""
    if not html_content:
        return None

    try:
        soup = BeautifulSoup(html_content, 'lxml')

        info = {
            'url': url,
            'title': soup.title.string if soup.title else '无标题',
            'h1_count': len(soup.find_all('h1')),
            'paragraph_count': len(soup.find_all('p')),
            'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
        }

        return info
    except Exception as e:
        print(f"解析错误: {url}, 错误: {e}")
        return None

def fetch_and_parse_concurrently(urls, max_workers=5):
    """并发获取和解析多个页面"""
    results = []

    # 使用线程池
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交获取任务
        future_to_url = {
            executor.submit(fetch_single_page, url): url
            for url in urls
        }

        # 处理完成的任务
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                html_content = future.result()
                if html_content:
                    # 解析内容
                    parsed_info = parse_page_content(html_content, url)
                    if parsed_info:
                        results.append(parsed_info)
                        print(f"完成: {url}")
            except Exception as e:
                print(f"处理异常: {url}, 错误: {e}")

    return results

# 使用示例
test_urls = [
    'https://httpbin.org/html',
    'https://httpbin.org/xml',
    'https://httpbin.org/robots.txt',
    'https://httpbin.org/json',
]

print("开始并发获取页面...")
start_time = time.time()

results = fetch_and_parse_concurrently(test_urls, max_workers=3)

end_time = time.time()
print(f"\n总共获取 {len(results)} 个页面")
print(f"总耗时: {end_time - start_time:.2f} 秒")

# 显示结果
for result in results:
    print(f"\nURL: {result['url']}")
    print(f"标题: {result['title']}")
    print(f"段落数: {result['paragraph_count']}")

7. 实际项目示例

7.1 构建简单的网络爬虫

import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import time
import os

class SimpleWebCrawler:
    """简单的网络爬虫"""

    def __init__(self, start_url, max_pages=10, delay=1):
        self.start_url = start_url
        self.max_pages = max_pages
        self.delay = delay
        self.visited = set()
        self.to_visit = set([start_url])
        self.results = []

    def is_valid_url(self, url):
        """检查URL是否有效"""
        parsed = urlparse(url)
        return bool(parsed.netloc) and bool(parsed.scheme)

    def get_all_links(self, soup, base_url):
        """获取页面所有链接"""
        links = set()

        for a_tag in soup.find_all('a', href=True):
            href = a_tag['href']

            # 转换为绝对URL
            absolute_url = urljoin(base_url, href)

            # 过滤无效URL
            if self.is_valid_url(absolute_url):
                links.add(absolute_url)

        return links

    def crawl_page(self, url):
        """抓取单个页面"""
        try:
            print(f"抓取: {url}")

            # 发送请求
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
            response = requests.get(url, headers=headers, timeout=10)

            if response.status_code != 200:
                print(f"  状态码: {response.status_code}")
                return []

            # 解析页面
            soup = BeautifulSoup(response.content, 'lxml')

            # 提取信息
            page_info = {
                'url': url,
                'title': soup.title.string if soup.title else '无标题',
                'meta_description': '',
                'h1_count': len(soup.find_all('h1')),
                'p_count': len(soup.find_all('p')),
                'img_count': len(soup.find_all('img')),
                'links_count': len(soup.find_all('a'))
            }

            # 提取meta描述
            meta_desc = soup.find('meta', attrs={'name': 'description'})
            if meta_desc:
                page_info['meta_description'] = meta_desc.get('content', '')

            self.results.append(page_info)

            # 提取链接
            new_links = self.get_all_links(soup, url)

            # 避免请求过快
            time.sleep(self.delay)

            return new_links

        except Exception as e:
            print(f"  错误: {e}")
            return []

    def crawl(self):
        """开始爬取"""
        print(f"开始爬取,起始URL: {self.start_url}")
        print(f"最大页面数: {self.max_pages}")

        while self.to_visit and len(self.visited) < self.max_pages:
            # 获取下一个URL
            current_url = self.to_visit.pop()

            # 跳过已访问的URL
            if current_url in self.visited:
                continue

            # 标记为已访问
            self.visited.add(current_url)

            # 抓取页面
            new_links = self.crawl_page(current_url)

            # 添加新链接到待访问列表
            for link in new_links:
                if link not in self.visited:
                    self.to_visit.add(link)

            # 显示进度
            print(f"进度: 已访问 {len(self.visited)}/{self.max_pages} 个页面")

        print(f"\n爬取完成!总共访问 {len(self.visited)} 个页面")
        return self.results

    def save_results(self, filename='crawl_results.json'):
        """保存结果"""
        import json

        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(self.results, f, ensure_ascii=False, indent=2)

        print(f"结果已保存到: {filename}")

# 使用示例(注意:实际使用请遵守robots.txt和网站政策)
# crawler = SimpleWebCrawler('https://httpbin.org', max_pages=3, delay=2)
# results = crawler.crawl()
# crawler.save_results()

8. 注意事项与最佳实践

注意事项1:遵守robots.txt
import requests
from urllib.parse import urljoin

def check_robots_txt(url):
    """检查robots.txt"""
    try:
        # 获取robots.txt
        robots_url = urljoin(url, '/robots.txt')
        response = requests.get(robots_url, timeout=5)

        if response.status_code == 200:
            print(f"找到robots.txt: {robots_url}")
            print("内容:")
            print(response.text[:500] + "..." if len(response.text) > 500 else response.text)
            return response.text
        else:
            print(f"没有找到robots.txt或无法访问: {robots_url}")
            return None

    except Exception as e:
        print(f"检查robots.txt时出错: {e}")
        return None

# 使用示例
robots_content = check_robots_txt('https://www.google.com')
if robots_content:
    # 解析robots.txt内容,判断是否允许爬取
    pass
注意事项2:设置请求间隔
import time
import random

class PoliteRequester:
    """有礼貌的请求器,避免对服务器造成压力"""

    def __init__(self, min_delay=1, max_delay=3):
        self.min_delay = min_delay
        self.max_delay = max_delay
        self.last_request_time = 0

    def wait_if_needed(self):
        """如果需要,等待一段时间"""
        current_time = time.time()
        time_since_last = current_time - self.last_request_time

        if time_since_last < self.min_delay:
            # 计算需要等待的时间
            wait_time = self.min_delay - time_since_last

            # 添加随机抖动
            wait_time += random.uniform(0, self.max_delay - self.min_delay)

            print(f"等待 {wait_time:.2f} 秒...")
            time.sleep(wait_time)

        self.last_request_time = time.time()

    def get(self, url, **kwargs):
        """发送GET请求"""
        self.wait_if_needed()
        return requests.get(url, **kwargs)

    def post(self, url, **kwargs):
        """发送POST请求"""
        self.wait_if_needed()
        return requests.post(url, **kwargs)

# 使用示例
requester = PoliteRequester(min_delay=2, max_delay=5)

# 连续请求多个页面
urls = ['https://httpbin.org/html', 'https://httpbin.org/json', 'https://httpbin.org/xml']

for url in urls:
    print(f"\n请求: {url}")
    response = requester.get(url)
    print(f"状态码: {response.status_code}")

最佳实践总结:

  1. 使用Session:对于需要保持会话的网站,使用requests.Session()
  2. 设置超时:为所有请求设置合理的超时时间
  3. 错误处理:妥善处理各种网络异常和HTTP错误
  4. 尊重网站:遵守robots.txt,设置合理的请求间隔
  5. 使用代理:对于需要大量请求或需要绕过限制的情况,使用代理
  6. 监控性能:记录请求耗时和成功率,及时调整策略

requests库常用配置:

import requests

# 创建配置良好的Session
session = requests.Session()

# 配置适配器
adapter = requests.adapters.HTTPAdapter(
    pool_connections=10,    # 连接池大小
    pool_maxsize=10,       # 最大连接数
    max_retries=3,         # 最大重试次数
)

session.mount('http://', adapter)
session.mount('https://', adapter)

# 配置请求头
session.headers.update({
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
    'Connection': 'keep-alive',
})

# 配置超时(全局)
session.request = lambda method, url, **kwargs: requests.Session.request(
    session, method, url, timeout=10, **kwargs
)

# 使用配置好的Session
response = session.get('https://httpbin.org/get')
print(f"状态码: {response.status_code}")
本章总结:BeautifulSoup 与 requests 库的配合使用是 Python 网页爬虫的黄金组合。requests 负责网络通信,BeautifulSoup 负责数据解析,两者结合可以构建强大而灵活的网页抓取程序。掌握这种配合使用方法,你将能够处理各种复杂的网页抓取场景,从简单的页面信息提取到复杂的分页抓取、表单提交等。记住要始终遵守网站的使用政策,设置合理的请求间隔,构建友好而高效的爬虫程序。