Requests 模拟浏览器行为

🌐 🕸️ 🖥️ 🔍
模拟浏览器行为 是爬虫开发中的关键技术,通过伪装成真实浏览器,可以绕过网站的反爬虫机制,提高数据采集的成功率。

为什么需要模拟浏览器行为?

真实浏览器行为
  • 包含完整的HTTP请求头
  • 自动管理Cookies和会话
  • 支持JavaScript渲染
  • 有用户交互行为
  • 遵循robots.txt
默认Requests行为
  • 请求头简单(User-Agent为python-requests)
  • 需要手动管理Cookies
  • 不支持JavaScript
  • 无用户行为模式
  • 容易被识别为爬虫
绕过反爬虫

许多网站通过检测HTTP请求头来识别爬虫,模拟浏览器可以避免被识别。

提高成功率

正确的请求头可以提高请求成功率,减少被拒绝或返回错误页面的情况。

维持会话状态

模拟浏览器可以保持登录状态,访问需要认证的页面。

数据一致性

获取与浏览器查看相同的数据,特别是对于依赖JavaScript渲染的网站。

基础请求头设置

HTTP请求头是浏览器身份的重要标识。Requests默认的请求头很简单,容易被识别为爬虫。

请求头 作用 示例值 重要性
User-Agent 标识浏览器和操作系统 Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 关键
Accept 声明可接收的响应类型 text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8 重要
Accept-Language 声明优先语言 zh-CN,zh;q=0.9,en;q=0.8 一般
Accept-Encoding 声明可接收的编码 gzip, deflate, br 重要
Connection 控制连接类型 keep-alive 一般
Upgrade-Insecure-Requests 升级不安全的请求 1 一般
Cache-Control 控制缓存行为 max-age=0 可选
Referer 声明来源页面 https://www.google.com/ 重要
设置浏览器级别的请求头
import requests

def get_browser_headers():
    """返回模拟Chrome浏览器的请求头"""
    return {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
        'Cache-Control': 'max-age=0',
        'Sec-Fetch-Dest': 'document',
        'Sec-Fetch-Mode': 'navigate',
        'Sec-Fetch-Site': 'none',
        'Sec-Fetch-User': '?1',
        'Sec-Ch-Ua': '"Chromium";v="91", " Not;A Brand";v="99"',
        'Sec-Ch-Ua-Mobile': '?0',
        'Sec-Ch-Ua-Platform': '"Windows"'
    }

def test_headers_comparison():
    """测试不同请求头的效果"""

    # 测试URL
    test_url = 'https://httpbin.org/headers'

    # 1. 默认Requests头
    print("1. 默认Requests头:")
    response1 = requests.get(test_url)
    print(f"   User-Agent: {response1.json()['headers'].get('User-Agent', '未找到')}")

    # 2. 模拟浏览器头
    print("\n2. 模拟浏览器头:")
    headers = get_browser_headers()
    response2 = requests.get(test_url, headers=headers)
    returned_headers = response2.json()['headers']
    print(f"   User-Agent: {returned_headers.get('User-Agent', '未找到')}")
    print(f"   Accept: {returned_headers.get('Accept', '未找到')}")
    print(f"   是否被识别为爬虫: {'python-requests' not in returned_headers.get('User-Agent', '')}")

    return response2

# 使用示例
if __name__ == "__main__":
    response = test_headers_comparison()
    print(f"\n服务器看到的请求头数量: {len(response.json()['headers'])}")

User-Agent管理

User-Agent是识别浏览器最重要的请求头。使用真实的、多样化的User-Agent可以大大提高爬虫的成功率。

User-Agent池示例
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36
Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15
Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0
Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36
Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1
User-Agent管理器
import requests
import random
import time

class UserAgentManager:
    """User-Agent管理器"""

    def __init__(self):
        # User-Agent池(实际使用时应更丰富)
        self.user_agents = [
            # Chrome - Windows
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36',
            'Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36',

            # Chrome - Mac
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36',

            # Firefox
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0',
            'Mozilla/5.0 (X11; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0',

            # Safari
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1.2 Safari/605.1.15',

            # Edge
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.59',

            # 移动端
            'Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1',
            'Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.120 Mobile Safari/537.36',
            'Mozilla/5.0 (iPad; CPU OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1',
        ]

        # 使用统计
        self.usage_stats = {ua: {'count': 0, 'last_used': 0} for ua in self.user_agents}

        # 当前使用的User-Agent
        self.current_ua = None

    def get_random(self):
        """获取随机的User-Agent"""
        self.current_ua = random.choice(self.user_agents)
        self.usage_stats[self.current_ua]['count'] += 1
        self.usage_stats[self.current_ua]['last_used'] = time.time()
        return self.current_ua

    def get_round_robin(self):
        """轮询获取User-Agent(使用次数最少的优先)"""
        # 按使用次数排序
        sorted_agents = sorted(self.user_agents,
                              key=lambda ua: (self.usage_stats[ua]['count'],
                                             self.usage_stats[ua]['last_used']))
        self.current_ua = sorted_agents[0]
        self.usage_stats[self.current_ua]['count'] += 1
        self.usage_stats[self.current_ua]['last_used'] = time.time()
        return self.current_ua

    def get_by_browser(self, browser='chrome', os='windows', version='latest'):
        """根据浏览器类型获取User-Agent"""
        browser = browser.lower()
        os = os.lower()

        # 简化的匹配逻辑(实际实现应更智能)
        for ua in self.user_agents:
            ua_lower = ua.lower()

            if browser in ua_lower:
                if os in ua_lower:
                    # 检查版本(简化逻辑)
                    if version == 'latest' or version in ua_lower:
                        return ua

        # 如果没有匹配的,返回随机的
        return self.get_random()

    def get_stats(self):
        """获取使用统计"""
        return {
            'total_agents': len(self.user_agents),
            'usage_stats': self.usage_stats,
            'most_used': max(self.usage_stats.items(),
                           key=lambda x: x[1]['count'])[0] if self.user_agents else None,
            'least_used': min(self.usage_stats.items(),
                            key=lambda x: x[1]['count'])[0] if self.user_agents else None,
        }

class BrowserSimulator:
    """浏览器模拟器(集成User-Agent管理)"""

    def __init__(self):
        self.ua_manager = UserAgentManager()
        self.session = requests.Session()
        self.setup_session()

    def setup_session(self):
        """设置会话的基本请求头"""
        # 获取User-Agent
        user_agent = self.ua_manager.get_random()

        # 设置通用请求头
        self.session.headers.update({
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
        })

        # 更新User-Agent
        self.session.headers['User-Agent'] = user_agent

    def rotate_user_agent(self):
        """轮换User-Agent"""
        new_ua = self.ua_manager.get_round_robin()
        self.session.headers['User-Agent'] = new_ua
        print(f"已切换User-Agent: {new_ua[:80]}...")
        return new_ua

    def get(self, url, **kwargs):
        """发送GET请求"""
        # 随机决定是否轮换User-Agent
        if random.random() < 0.3:  # 30%的概率轮换
            self.rotate_user_agent()

        # 添加Referer(模拟从Google搜索进入)
        if 'headers' not in kwargs:
            kwargs['headers'] = {}

        if 'Referer' not in kwargs['headers'] and random.random() < 0.5:
            kwargs['headers']['Referer'] = 'https://www.google.com/'

        return self.session.get(url, **kwargs)

    def close(self):
        """关闭会话"""
        self.session.close()

# 使用示例
if __name__ == "__main__":
    simulator = BrowserSimulator()

    try:
        # 测试多个请求
        test_urls = [
            'https://httpbin.org/user-agent',
            'https://httpbin.org/headers',
            'https://httpbin.org/ip',
        ]

        for i, url in enumerate(test_urls, 1):
            print(f"\n请求 {i}: {url}")

            response = simulator.get(url)

            if response.status_code == 200:
                print(f"状态码: {response.status_code}")

                if 'user-agent' in url:
                    data = response.json()
                    print(f"服务器看到的User-Agent: {data.get('user-agent', '未找到')}")

            # 随机延迟,模拟人类行为
            time.sleep(random.uniform(1, 3))

        # 查看统计
        stats = simulator.ua_manager.get_stats()
        print(f"\nUser-Agent使用统计:")
        print(f"总共 {stats['total_agents']} 个User-Agent")
        print(f"最常使用的: {stats['most_used'][:80]}...")

    finally:
        simulator.close()

Cookies和会话管理

浏览器会自动管理Cookies以维持会话状态。使用Requests的Session对象可以模拟这一行为。

1
首次访问

浏览器发送请求,服务器设置Cookies

2
保存Cookies

浏览器自动保存服务器返回的Cookies

3
后续请求

浏览器自动发送保存的Cookies

4
维持会话

服务器通过Cookies识别用户会话

模拟浏览器Cookies管理
import requests
import json
import time
import random

class CookieManager:
    """Cookie管理器"""

    def __init__(self, session=None):
        self.session = session or requests.Session()
        self.cookie_jar = self.session.cookies
        self.cookie_history = []  # 记录Cookie变化历史

    def save_cookies(self, filename='cookies.json'):
        """保存Cookies到文件"""
        cookies = []
        for cookie in self.cookie_jar:
            cookie_dict = {
                'name': cookie.name,
                'value': cookie.value,
                'domain': cookie.domain,
                'path': cookie.path,
                'expires': cookie.expires,
                'secure': cookie.secure,
                'rest': cookie._rest
            }
            cookies.append(cookie_dict)

        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(cookies, f, indent=2, ensure_ascii=False)

        print(f"已保存 {len(cookies)} 个Cookies到 {filename}")
        return cookies

    def load_cookies(self, filename='cookies.json'):
        """从文件加载Cookies"""
        try:
            with open(filename, 'r', encoding='utf-8') as f:
                cookies = json.load(f)

            for cookie_dict in cookies:
                # 创建Cookie对象
                from http.cookiejar import Cookie

                cookie = Cookie(
                    version=0,
                    name=cookie_dict['name'],
                    value=cookie_dict['value'],
                    port=None,
                    port_specified=False,
                    domain=cookie_dict['domain'],
                    domain_specified=True,
                    domain_initial_dot=False,
                    path=cookie_dict['path'],
                    path_specified=True,
                    secure=cookie_dict['secure'],
                    expires=cookie_dict['expires'],
                    discard=False,
                    comment=None,
                    comment_url=None,
                    rest=cookie_dict.get('rest', {})
                )

                self.cookie_jar.set_cookie(cookie)

            print(f"已从 {filename} 加载 {len(cookies)} 个Cookies")
            return True

        except FileNotFoundError:
            print(f"Cookie文件 {filename} 不存在")
            return False
        except Exception as e:
            print(f"加载Cookies失败: {e}")
            return False

    def clear_cookies(self):
        """清除所有Cookies"""
        self.cookie_jar.clear()
        print("已清除所有Cookies")

    def get_cookie_dict(self):
        """获取Cookies字典"""
        return requests.utils.dict_from_cookiejar(self.cookie_jar)

    def set_cookie(self, name, value, domain=None, path='/'):
        """设置Cookie"""
        # 创建Cookie对象
        from http.cookiejar import Cookie

        cookie = Cookie(
            version=0,
            name=name,
            value=value,
            port=None,
            port_specified=False,
            domain=domain or '.example.com',
            domain_specified=True,
            domain_initial_dot=False,
            path=path,
            path_specified=True,
            secure=False,
            expires=None,
            discard=False,
            comment=None,
            comment_url=None,
            rest={}
        )

        self.cookie_jar.set_cookie(cookie)

        # 记录历史
        self.cookie_history.append({
            'action': 'set',
            'name': name,
            'value': value,
            'domain': domain,
            'timestamp': time.time()
        })

    def print_cookies(self):
        """打印当前Cookies"""
        cookies = self.get_cookie_dict()

        if not cookies:
            print("当前没有Cookies")
            return

        print(f"当前有 {len(cookies)} 个Cookies:")
        for name, value in cookies.items():
            print(f"  {name}: {value}")

class SessionSimulator:
    """会话模拟器(完整模拟浏览器会话)"""

    def __init__(self, persist_cookies=True):
        self.session = requests.Session()
        self.cookie_manager = CookieManager(self.session)
        self.persist_cookies = persist_cookies

        # 设置浏览器级别的请求头
        self.setup_headers()

        # 加载保存的Cookies(如果存在)
        if persist_cookies:
            self.cookie_manager.load_cookies()

    def setup_headers(self):
        """设置请求头"""
        # 基础请求头
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
            'Cache-Control': 'max-age=0',
        })

    def simulate_human_behavior(self):
        """模拟人类行为(随机延迟等)"""
        # 随机延迟
        delay = random.uniform(0.5, 3.0)
        time.sleep(delay)

        # 随机滚动(模拟鼠标滚动)
        if random.random() < 0.3:
            time.sleep(random.uniform(0.1, 0.5))

    def get(self, url, **kwargs):
        """模拟浏览器的GET请求"""
        # 模拟人类行为
        self.simulate_human_behavior()

        # 发送请求
        response = self.session.get(url, **kwargs)

        # 记录Cookies变化
        if response.cookies:
            print(f"请求 {url} 收到 {len(response.cookies)} 个Cookies")

        # 自动保存Cookies(如果启用)
        if self.persist_cookies and response.status_code == 200:
            self.cookie_manager.save_cookies()

        return response

    def post(self, url, data=None, **kwargs):
        """模拟浏览器的POST请求"""
        # 模拟人类行为
        self.simulate_human_behavior()

        # 设置Content-Type(如果未提供)
        if 'headers' not in kwargs:
            kwargs['headers'] = {}

        if 'Content-Type' not in kwargs['headers']:
            kwargs['headers']['Content-Type'] = 'application/x-www-form-urlencoded'

        # 发送请求
        response = self.session.post(url, data=data, **kwargs)

        # 自动保存Cookies(如果启用)
        if self.persist_cookies and response.status_code == 200:
            self.cookie_manager.save_cookies()

        return response

    def login(self, login_url, login_data, check_url=None):
        """模拟登录过程"""
        print(f"尝试登录: {login_url}")

        # 发送登录请求
        response = self.post(login_url, data=login_data)

        if response.status_code == 200:
            print("登录请求成功")

            # 检查是否登录成功
            if check_url:
                check_response = self.get(check_url)

                # 根据返回内容判断登录状态
                # 这里需要根据具体网站调整判断逻辑
                if 'login' not in check_response.url.lower():
                    print("登录成功!")
                    return True
                else:
                    print("登录失败")
                    return False
            else:
                print("登录请求完成(未验证登录状态)")
                return True
        else:
            print(f"登录失败,状态码: {response.status_code}")
            return False

    def close(self):
        """关闭会话并保存Cookies"""
        if self.persist_cookies:
            self.cookie_manager.save_cookies()

        self.session.close()
        print("会话已关闭")

# 使用示例
if __name__ == "__main__":
    # 创建会话模拟器
    simulator = SessionSimulator(persist_cookies=True)

    try:
        # 查看初始Cookies
        print("初始Cookies:")
        simulator.cookie_manager.print_cookies()

        # 访问测试网站(会设置Cookies)
        print("\n访问测试网站...")
        response = simulator.get('https://httpbin.org/cookies/set?name=value&session=abc123')

        if response.status_code == 200:
            data = response.json()
            print(f"服务器看到的Cookies: {data.get('cookies', {})}")

        # 查看更新后的Cookies
        print("\n更新后的Cookies:")
        simulator.cookie_manager.print_cookies()

        # 模拟登录流程
        print("\n模拟登录流程...")

        # 注意:这里使用httpbin.org的模拟登录端点
        login_success = simulator.login(
            login_url='https://httpbin.org/post',
            login_data={'username': 'test', 'password': '123456'},
            check_url='https://httpbin.org/cookies'
        )

        print(f"登录结果: {'成功' if login_success else '失败'}")

    finally:
        simulator.close()

浏览器指纹技术

现代网站使用浏览器指纹技术来识别和追踪用户。模拟浏览器需要了解这些指纹特征。

浏览器指纹组成要素
User-Agent

浏览器类型、版本、操作系统信息

Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36...
HTTP请求头

Accept、Accept-Language、Accept-Encoding等

Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
插件和MIME类型

浏览器支持的插件和MIME类型

application/pdf, application/x-shockwave-flash
屏幕和颜色信息

屏幕分辨率、颜色深度、时区等

屏幕: 1920x1080, 时区: UTC+8
JavaScript属性

navigator、screen、performance对象属性

navigator.userAgent, navigator.platform
生成浏览器指纹
import requests
import random
import platform
import time

class BrowserFingerprintGenerator:
    """浏览器指纹生成器"""

    def __init__(self):
        self.fingerprint = {}
        self.generate_fingerprint()

    def generate_fingerprint(self):
        """生成浏览器指纹"""
        # 1. User-Agent相关
        self.fingerprint['user_agent'] = self.get_user_agent()

        # 2. HTTP头相关
        self.fingerprint['http_headers'] = self.get_http_headers()

        # 3. 屏幕信息
        self.fingerprint['screen'] = self.get_screen_info()

        # 4. 时区和语言
        self.fingerprint['locale'] = self.get_locale_info()

        # 5. 其他浏览器属性
        self.fingerprint['browser_features'] = self.get_browser_features()

    def get_user_agent(self):
        """生成User-Agent"""
        # 浏览器类型
        browsers = [
            {
                'name': 'chrome',
                'templates': [
                    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{version} Safari/537.36',
                    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{version} Safari/537.36',
                    'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{version} Safari/537.36',
                ]
            },
            {
                'name': 'firefox',
                'templates': [
                    'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:{version}) Gecko/20100101 Firefox/{version}',
                    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:{version}) Gecko/20100101 Firefox/{version}',
                ]
            },
            {
                'name': 'safari',
                'templates': [
                    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/{version} Safari/605.1.15',
                ]
            }
        ]

        browser = random.choice(browsers)
        template = random.choice(browser['templates'])

        # 版本号
        if browser['name'] == 'chrome':
            version = f"{random.randint(85, 95)}.0.{random.randint(1000, 5000)}.{random.randint(100, 200)}"
        elif browser['name'] == 'firefox':
            version = f"{random.randint(85, 95)}.0"
        else:  # safari
            version = f"{random.randint(13, 15)}.{random.randint(0, 3)}.{random.randint(0, 3)}"

        return template.replace('{version}', version)

    def get_http_headers(self):
        """生成HTTP请求头"""
        return {
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
            'Accept-Language': self.get_accept_language(),
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
            'Sec-Fetch-Dest': 'document',
            'Sec-Fetch-Mode': 'navigate',
            'Sec-Fetch-Site': 'none',
            'Sec-Fetch-User': '?1',
        }

    def get_accept_language(self):
        """生成Accept-Language"""
        languages = [
            ('zh-CN', 0.9),
            ('zh', 0.8),
            ('en-US', 0.7),
            ('en', 0.6),
            ('ja', 0.5),
            ('ko', 0.4),
        ]

        # 随机选择2-4种语言
        selected = random.sample(languages, random.randint(2, 4))
        selected.sort(key=lambda x: x[1], reverse=True)  # 按权重排序

        return ', '.join([f"{lang};q={q}" for lang, q in selected])

    def get_screen_info(self):
        """生成屏幕信息"""
        resolutions = [
            {'width': 1920, 'height': 1080, 'depth': 24},
            {'width': 1366, 'height': 768, 'depth': 24},
            {'width': 1536, 'height': 864, 'depth': 24},
            {'width': 1440, 'height': 900, 'depth': 24},
            {'width': 1280, 'height': 720, 'depth': 24},
        ]

        return random.choice(resolutions)

    def get_locale_info(self):
        """生成本地化信息"""
        timezones = ['Asia/Shanghai', 'America/New_York', 'Europe/London', 'Asia/Tokyo']
        locales = ['zh-CN', 'en-US', 'ja-JP', 'ko-KR']

        return {
            'timezone': random.choice(timezones),
            'locale': random.choice(locales),
            'language': random.choice(['zh', 'en', 'ja', 'ko']),
        }

    def get_browser_features(self):
        """生成浏览器特性"""
        return {
            'cookies_enabled': True,
            'do_not_track': random.choice([None, '1', '0']),
            'hardware_concurrency': random.choice([2, 4, 8, 16]),
            'device_memory': random.choice([4, 8, 16, 32]),
            'platform': platform.system(),
            'plugins': self.get_plugins(),
        }

    def get_plugins(self):
        """生成插件列表"""
        plugin_templates = [
            'Chrome PDF Viewer',
            'Chromium PDF Viewer',
            'Microsoft Edge PDF Viewer',
            'WebKit built-in PDF',
            'Native Client',
        ]

        # 随机选择0-3个插件
        num_plugins = random.randint(0, 3)
        return random.sample(plugin_templates, num_plugins)

    def apply_to_session(self, session):
        """将指纹应用到Session"""
        # 设置User-Agent
        session.headers['User-Agent'] = self.fingerprint['user_agent']

        # 设置其他HTTP头
        for key, value in self.fingerprint['http_headers'].items():
            session.headers[key] = value

        return session

    def to_dict(self):
        """返回指纹字典"""
        return self.fingerprint

# 使用指纹的浏览器模拟器
class FingerprintedBrowser:
    """带有浏览器指纹的模拟器"""

    def __init__(self):
        self.session = requests.Session()
        self.fingerprint = BrowserFingerprintGenerator()
        self.fingerprint.apply_to_session(self.session)

        # 其他设置
        self.setup_additional_headers()

    def setup_additional_headers(self):
        """设置额外的请求头"""
        # 添加常见但容易被忽略的请求头
        self.session.headers.update({
            'Cache-Control': 'max-age=0',
            'DNT': self.fingerprint.fingerprint['browser_features']['do_not_track'] or '1',
            'Sec-Ch-Ua': '"Chromium";v="94", "Google Chrome";v="94", ";Not A Brand";v="99"',
            'Sec-Ch-Ua-Mobile': '?0',
            'Sec-Ch-Ua-Platform': '"Windows"',
        })

    def get(self, url, **kwargs):
        """发送GET请求"""
        # 添加Referer(如果未提供)
        if 'headers' not in kwargs:
            kwargs['headers'] = {}

        if 'Referer' not in kwargs['headers'] and random.random() < 0.6:
            # 模拟从搜索引擎进入
            search_engines = [
                'https://www.google.com/',
                'https://www.bing.com/',
                'https://search.yahoo.com/',
                'https://www.baidu.com/',
            ]
            kwargs['headers']['Referer'] = random.choice(search_engines)

        # 添加随机延迟,模拟人类阅读时间
        time.sleep(random.uniform(1.0, 3.0))

        return self.session.get(url, **kwargs)

    def get_fingerprint_report(self):
        """获取指纹报告"""
        fp = self.fingerprint.to_dict()

        report = []
        report.append("=" * 60)
        report.append("浏览器指纹报告")
        report.append("=" * 60)

        report.append(f"User-Agent: {fp['user_agent']}")
        report.append(f"屏幕分辨率: {fp['screen']['width']}x{fp['screen']['height']}")
        report.append(f"颜色深度: {fp['screen']['depth']}位")
        report.append(f"时区: {fp['locale']['timezone']}")
        report.append(f"语言: {fp['locale']['language']}")
        report.append(f"HTTP头数量: {len(fp['http_headers'])}")
        report.append(f"插件数量: {len(fp['browser_features']['plugins'])}")

        return '\n'.join(report)

# 使用示例
if __name__ == "__main__":
    # 创建带指纹的浏览器
    browser = FingerprintedBrowser()

    # 打印指纹报告
    print(browser.get_fingerprint_report())

    # 测试请求
    print("\n测试请求...")
    response = browser.get('https://httpbin.org/headers')

    if response.status_code == 200:
        data = response.json()
        headers = data['headers']

        print(f"\n服务器看到的请求头:")
        print(f"User-Agent: {headers.get('User-Agent', '未找到')}")
        print(f"Accept-Language: {headers.get('Accept-Language', '未找到')}")
        print(f"Referer: {headers.get('Referer', '未找到')}")

        # 检查是否被识别为爬虫
        user_agent = headers.get('User-Agent', '')
        if 'python' in user_agent.lower() or 'requests' in user_agent.lower():
            print("警告: 可能被识别为爬虫!")
        else:
            print("成功: 请求头看起来像真实浏览器")
[INFO] 生成浏览器指纹...
[SUCCESS] User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64)...
[SUCCESS] 屏幕: 1920x1080 (24位色深)
[SUCCESS] 语言: zh-CN,zh;q=0.9,en;q=0.8
[SUCCESS] 时区: Asia/Shanghai
[SUCCESS] 指纹生成完成,共5个特征

高级模拟技术

应对高级反爬虫技术

现代网站使用更复杂的反爬虫技术,需要更高级的模拟方法来应对。

请求随机化

随机化请求间隔、请求头顺序、参数顺序等

鼠标行为模拟

模拟点击、滚动、悬停等用户交互行为

JavaScript执行

使用Selenium或Pyppeteer执行JavaScript

IP代理池

使用代理IP轮换,避免IP被封禁

高级浏览器模拟器
import requests
import time
import random
import json
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class AdvancedBrowserSimulator:
    """高级浏览器模拟器"""

    def __init__(self, use_proxy=False, proxy_pool=None):
        self.session = requests.Session()
        self.use_proxy = use_proxy
        self.proxy_pool = proxy_pool or []
        self.current_proxy = None

        # 配置Session
        self.setup_session()

        # 行为模式
        self.behavior_pattern = {
            'min_delay': 0.5,
            'max_delay': 3.0,
            'scroll_probability': 0.3,
            'click_probability': 0.2,
            'back_probability': 0.1,
        }

        # 请求历史
        self.request_history = []

        # 当前页面信息
        self.current_page = None

    def setup_session(self):
        """配置Session"""
        # 设置请求头
        self.setup_headers()

        # 配置重试策略
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET", "POST"]
        )

        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)

        # 设置代理(如果启用)
        if self.use_proxy and self.proxy_pool:
            self.rotate_proxy()

    def setup_headers(self):
        """设置请求头"""
        # 随机选择浏览器类型
        browsers = self.get_browser_templates()
        browser = random.choice(browsers)

        self.session.headers.update(browser['headers'])

        # 随机添加一些可选请求头
        optional_headers = {
            'DNT': random.choice(['1', '0']),
            'Save-Data': random.choice(['on', None]),
            'X-Requested-With': random.choice(['XMLHttpRequest', None]),
        }

        for key, value in optional_headers.items():
            if value:
                self.session.headers[key] = value

    def get_browser_templates(self):
        """获取浏览器模板"""
        return [
            {
                'name': 'chrome_win',
                'headers': {
                    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36',
                    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
                    'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
                    'Accept-Encoding': 'gzip, deflate, br',
                    'Connection': 'keep-alive',
                    'Upgrade-Insecure-Requests': '1',
                    'Sec-Fetch-Dest': 'document',
                    'Sec-Fetch-Mode': 'navigate',
                    'Sec-Fetch-Site': 'none',
                    'Sec-Fetch-User': '?1',
                    'Cache-Control': 'max-age=0',
                }
            },
            {
                'name': 'firefox_mac',
                'headers': {
                    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:92.0) Gecko/20100101 Firefox/92.0',
                    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
                    'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
                    'Accept-Encoding': 'gzip, deflate, br',
                    'Connection': 'keep-alive',
                    'Upgrade-Insecure-Requests': '1',
                    'Cache-Control': 'max-age=0',
                    'TE': 'Trailers',
                }
            },
            {
                'name': 'edge_win',
                'headers': {
                    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36 Edg/94.0.992.50',
                    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
                    'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
                    'Accept-Encoding': 'gzip, deflate, br',
                    'Connection': 'keep-alive',
                    'Upgrade-Insecure-Requests': '1',
                    'Sec-Fetch-Dest': 'document',
                    'Sec-Fetch-Mode': 'navigate',
                    'Sec-Fetch-Site': 'none',
                    'Sec-Fetch-User': '?1',
                }
            }
        ]

    def rotate_proxy(self):
        """轮换代理"""
        if not self.proxy_pool:
            return

        self.current_proxy = random.choice(self.proxy_pool)

        # 设置代理
        self.session.proxies = {
            'http': self.current_proxy,
            'https': self.current_proxy,
        }

        print(f"已切换代理: {self.current_proxy}")

    def simulate_human_behavior(self):
        """模拟人类浏览行为"""
        # 随机延迟
        delay = random.uniform(
            self.behavior_pattern['min_delay'],
            self.behavior_pattern['max_delay']
        )
        time.sleep(delay)

        # 模拟滚动
        if random.random() < self.behavior_pattern['scroll_probability']:
            self.simulate_scroll()

        # 模拟点击
        if random.random() < self.behavior_pattern['click_probability']:
            self.simulate_click()

        # 模拟返回
        if random.random() < self.behavior_pattern['back_probability'] and self.request_history:
            self.simulate_back()

    def simulate_scroll(self):
        """模拟滚动"""
        # 在实际浏览器中,滚动会触发额外请求
        # 这里我们模拟滚动行为
        scroll_delay = random.uniform(0.1, 0.5)
        time.sleep(scroll_delay)

        scroll_type = random.choice(['up', 'down'])
        print(f"模拟滚动: {scroll_type}")

    def simulate_click(self):
        """模拟点击"""
        click_delay = random.uniform(0.1, 0.3)
        time.sleep(click_delay)
        print("模拟点击")

    def simulate_back(self):
        """模拟返回上一页"""
        if len(self.request_history) > 1:
            # 从历史记录中获取上一页
            prev_page = self.request_history[-2]
            print(f"模拟返回上一页: {prev_page['url']}")

            # 短暂延迟
            time.sleep(random.uniform(0.5, 1.0))

    def randomize_request_params(self, url, params=None):
        """随机化请求参数"""
        parsed = urlparse(url)

        # 处理查询参数
        query_dict = parse_qs(parsed.query)

        if params:
            query_dict.update(params)

        # 随机化参数顺序
        if query_dict:
            items = list(query_dict.items())
            random.shuffle(items)
            query_dict = dict(items)

            # 重新构建查询字符串
            new_query = urlencode(query_dict, doseq=True)

            # 重建URL
            url = urlunparse((
                parsed.scheme,
                parsed.netloc,
                parsed.path,
                parsed.params,
                new_query,
                parsed.fragment
            ))

        return url

    def get(self, url, params=None, **kwargs):
        """模拟浏览器的GET请求"""
        # 模拟人类行为
        self.simulate_human_behavior()

        # 随机化参数
        url = self.randomize_request_params(url, params)

        # 添加Referer(模拟从上一页进入)
        if 'headers' not in kwargs:
            kwargs['headers'] = {}

        if 'Referer' not in kwargs['headers'] and self.request_history:
            last_request = self.request_history[-1]
            kwargs['headers']['Referer'] = last_request['url']

        # 随机轮换代理
        if self.use_proxy and random.random() < 0.3:
            self.rotate_proxy()

        # 发送请求
        start_time = time.time()
        response = self.session.get(url, **kwargs)
        end_time = time.time()

        # 记录请求历史
        self.request_history.append({
            'method': 'GET',
            'url': url,
            'status_code': response.status_code,
            'response_time': end_time - start_time,
            'timestamp': start_time,
        })

        # 更新当前页面
        self.current_page = url

        return response

    def post(self, url, data=None, **kwargs):
        """模拟浏览器的POST请求"""
        # 模拟人类行为
        self.simulate_human_behavior()

        # 设置Content-Type
        if 'headers' not in kwargs:
            kwargs['headers'] = {}

        if 'Content-Type' not in kwargs['headers']:
            kwargs['headers']['Content-Type'] = 'application/x-www-form-urlencoded'

        # 添加Referer
        if 'Referer' not in kwargs['headers'] and self.request_history:
            last_request = self.request_history[-1]
            kwargs['headers']['Referer'] = last_request['url']

        # 发送请求
        start_time = time.time()
        response = self.session.post(url, data=data, **kwargs)
        end_time = time.time()

        # 记录请求历史
        self.request_history.append({
            'method': 'POST',
            'url': url,
            'status_code': response.status_code,
            'response_time': end_time - start_time,
            'timestamp': start_time,
        })

        return response

    def get_request_stats(self):
        """获取请求统计"""
        if not self.request_history:
            return None

        total_requests = len(self.request_history)
        successful = sum(1 for r in self.request_history if 200 <= r['status_code'] < 400)
        failed = total_requests - successful
        avg_response_time = sum(r['response_time'] for r in self.request_history) / total_requests

        return {
            'total_requests': total_requests,
            'successful': successful,
            'failed': failed,
            'success_rate': successful / total_requests if total_requests > 0 else 0,
            'avg_response_time': avg_response_time,
            'current_proxy': self.current_proxy,
        }

    def close(self):
        """关闭会话"""
        self.session.close()
        print("高级浏览器模拟器已关闭")

# 示例:代理池
proxy_pool_example = [
    'http://proxy1.example.com:8080',
    'http://proxy2.example.com:8080',
    'http://proxy3.example.com:8080',
    'http://user:pass@proxy4.example.com:8080',
    'socks5://proxy5.example.com:1080',
]

# 使用示例
if __name__ == "__main__":
    # 创建高级浏览器模拟器
    browser = AdvancedBrowserSimulator(use_proxy=False)  # 设为True启用代理

    try:
        # 模拟浏览多个页面
        urls = [
            'https://httpbin.org/get',
            'https://httpbin.org/headers',
            'https://httpbin.org/ip',
            'https://httpbin.org/user-agent',
        ]

        for i, url in enumerate(urls, 1):
            print(f"\n浏览页面 {i}/{len(urls)}: {url}")

            response = browser.get(url)

            if response.status_code == 200:
                print(f"状态码: {response.status_code}")
                print(f"响应大小: {len(response.content)} 字节")

                # 如果是/user-agent,显示服务器看到的User-Agent
                if 'user-agent' in url:
                    data = response.json()
                    print(f"服务器看到的User-Agent: {data.get('user-agent', '未找到')}")
            else:
                print(f"请求失败: {response.status_code}")

        # 获取统计信息
        print("\n" + "=" * 60)
        print("浏览统计:")
        stats = browser.get_request_stats()
        if stats:
            for key, value in stats.items():
                if key == 'success_rate':
                    print(f"  {key}: {value:.2%}")
                elif key == 'avg_response_time':
                    print(f"  {key}: {value:.3f}秒")
                else:
                    print(f"  {key}: {value}")

    finally:
        browser.close()
// JavaScript检测示例
if (typeof navigator !== 'undefined') {
  var fingerprint = {
    userAgent: navigator.userAgent,
    platform: navigator.platform,
    language: navigator.language,
    screenWidth: screen.width,
    screenHeight: screen.height,
    timezone: Intl.DateTimeFormat().resolvedOptions().timeZone
  };
  // 发送指纹到服务器进行验证
  fetch('/api/fingerprint', {
    method: 'POST',
    body: JSON.stringify(fingerprint)
  });
}

反反爬策略

隐身模式

定期清理Cookies,使用隐私模式请求头

验证码识别

集成验证码识别服务,自动处理验证码

云浏览器API

使用Puppeteer/Playwright云服务执行JavaScript

请求限速

控制请求频率,避免触发反爬限制

综合反反爬策略实现
import requests
import time
import random
import hashlib
from datetime import datetime, timedelta

class AntiAntiCrawlerSystem:
    """反反爬系统"""

    def __init__(self):
        self.session = requests.Session()
        self.setup_session()

        # 反反爬策略配置
        self.strategies = {
            'header_rotation': True,
            'request_randomization': True,
            'delay_randomization': True,
            'cookie_management': True,
            'proxy_rotation': False,  # 需要代理池
            'captcha_handling': False,  # 需要验证码识别服务
        }

        # 请求计数器(用于检测频率限制)
        self.request_counter = {}
        self.rate_limit_window = 60  # 60秒窗口

        # 黑名单记录
        self.blacklist = {}

    def setup_session(self):
        """配置Session"""
        # 初始请求头
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
        })

    def rotate_headers(self):
        """轮换请求头"""
        if not self.strategies['header_rotation']:
            return

        # 修改User-Agent
        user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:92.0) Gecko/20100101 Firefox/92.0',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15',
        ]

        self.session.headers['User-Agent'] = random.choice(user_agents)

        # 随机修改其他请求头
        if random.random() < 0.3:
            self.session.headers['Accept'] = random.choice([
                'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
                'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
                'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
            ])

        print(f"已轮换请求头,当前User-Agent: {self.session.headers['User-Agent'][:50]}...")

    def check_rate_limit(self, domain):
        """检查频率限制"""
        current_time = time.time()

        # 清理过期的计数器
        if domain in self.request_counter:
            self.request_counter[domain] = [
                t for t in self.request_counter[domain]
                if current_time - t < self.rate_limit_window
            ]

        # 获取当前窗口内的请求数
        request_count = len(self.request_counter.get(domain, []))

        # 简单的频率限制策略
        if request_count > 30:  # 每分钟30个请求
            print(f"警告: 域名 {domain} 请求频率过高 ({request_count}/分钟)")
            return False

        # 记录当前请求
        if domain not in self.request_counter:
            self.request_counter[domain] = []
        self.request_counter[domain].append(current_time)

        return True

    def apply_delay(self, domain):
        """应用延迟"""
        if not self.strategies['delay_randomization']:
            return

        # 基础延迟
        base_delay = random.uniform(1.0, 3.0)

        # 根据请求频率调整延迟
        if domain in self.request_counter:
            request_count = len([
                t for t in self.request_counter[domain]
                if time.time() - t < 60
            ])

            # 请求越多,延迟越长
            if request_count > 20:
                base_delay *= 2
            elif request_count > 10:
                base_delay *= 1.5

        # 随机抖动
        jitter = random.uniform(-0.2, 0.2)
        delay = max(0.5, base_delay + jitter)

        print(f"延迟 {delay:.2f} 秒")
        time.sleep(delay)

    def handle_blocked_response(self, response, url):
        """处理被阻止的响应"""
        status_code = response.status_code

        # 检查常见的反爬响应
        blocked_indicators = [
            (403, '访问被拒绝'),
            (429, '请求过多'),
            (503, '服务暂时不可用'),
            (200, '验证码页面'),  # 需要通过内容判断
        ]

        for code, description in blocked_indicators:
            if status_code == code:
                print(f"检测到可能的反爬: {description}")

                # 添加到黑名单
                domain = self.extract_domain(url)
                if domain not in self.blacklist:
                    self.blacklist[domain] = []

                self.blacklist[domain].append({
                    'url': url,
                    'status_code': status_code,
                    'timestamp': time.time(),
                    'response_text': response.text[:200] if response.text else '',
                })

                # 应用冷却时间
                self.apply_cool_down(domain)

                return True

        # 检查响应内容中的反爬提示
        if response.text:
            anti_crawler_keywords = [
                '验证码', 'captcha', 'robot', '爬虫', 'spider',
                'access denied', 'too many requests', 'blocked'
            ]

            text_lower = response.text.lower()
            for keyword in anti_crawler_keywords:
                if keyword in text_lower:
                    print(f"响应中包含反爬关键词: {keyword}")
                    return True

        return False

    def extract_domain(self, url):
        """提取域名"""
        from urllib.parse import urlparse
        parsed = urlparse(url)
        return parsed.netloc

    def apply_cool_down(self, domain):
        """应用冷却时间"""
        cool_down_time = random.uniform(30, 180)  # 30-180秒冷却
        print(f"域名 {domain} 进入冷却,等待 {cool_down_time:.1f} 秒")
        time.sleep(cool_down_time)

        # 轮换请求头
        self.rotate_headers()

        # 清理Cookies(如果是这个域名)
        if self.strategies['cookie_management']:
            self.clear_domain_cookies(domain)

    def clear_domain_cookies(self, domain):
        """清理指定域名的Cookies"""
        cookies_to_keep = []

        for cookie in self.session.cookies:
            if domain not in cookie.domain:
                cookies_to_keep.append(cookie)

        # 重新设置Cookies
        self.session.cookies.clear()
        for cookie in cookies_to_keep:
            self.session.cookies.set_cookie(cookie)

        print(f"已清理域名 {domain} 的Cookies")

    def smart_request(self, method, url, **kwargs):
        """智能请求(集成反反爬策略)"""
        # 提取域名
        domain = self.extract_domain(url)

        # 检查频率限制
        if not self.check_rate_limit(domain):
            print(f"频率限制: 跳过请求 {url}")
            return None

        # 应用延迟
        self.apply_delay(domain)

        # 随机轮换请求头
        if self.strategies['header_rotation'] and random.random() < 0.3:
            self.rotate_headers()

        # 发送请求
        try:
            if method.upper() == 'GET':
                response = self.session.get(url, **kwargs)
            elif method.upper() == 'POST':
                response = self.session.post(url, **kwargs)
            else:
                response = self.session.request(method, url, **kwargs)

            # 检查是否被阻止
            if self.handle_blocked_response(response, url):
                # 如果是GET请求且被阻止,可以尝试重试
                if method.upper() == 'GET' and random.random() < 0.5:
                    print(f"请求被阻止,尝试重试...")
                    time.sleep(random.uniform(5, 10))

                    # 轮换请求头
                    self.rotate_headers()

                    # 重试请求
                    return self.smart_request(method, url, **kwargs)
                else:
                    return None

            return response

        except requests.exceptions.RequestException as e:
            print(f"请求异常: {e}")

            # 异常处理:如果是连接错误,等待后重试
            if random.random() < 0.3:
                print("等待后重试...")
                time.sleep(random.uniform(10, 30))
                return self.smart_request(method, url, **kwargs)

            return None

    def get_stats(self):
        """获取统计信息"""
        stats = {
            'total_domains': len(self.request_counter),
            'total_requests': sum(len(times) for times in self.request_counter.values()),
            'blacklisted_domains': len(self.blacklist),
            'current_strategies': self.strategies,
        }

        # 各域名请求统计
        domain_stats = {}
        for domain, times in self.request_counter.items():
            recent_requests = len([t for t in times if time.time() - t < 60])
            domain_stats[domain] = {
                'total_requests': len(times),
                'recent_requests_per_minute': recent_requests,
            }

        stats['domain_stats'] = domain_stats

        return stats

# 使用示例
if __name__ == "__main__":
    # 创建反反爬系统
    anti_system = AntiAntiCrawlerSystem()

    try:
        # 测试多个请求
        test_urls = [
            'https://httpbin.org/headers',
            'https://httpbin.org/ip',
            'https://httpbin.org/user-agent',
            'https://httpbin.org/delay/2',  # 模拟慢响应
        ]

        for i, url in enumerate(test_urls, 1):
            print(f"\n请求 {i}/{len(test_urls)}: {url}")

            response = anti_system.smart_request('GET', url, timeout=10)

            if response and response.status_code == 200:
                print(f"成功: {response.status_code}")
                print(f"响应大小: {len(response.content)} 字节")
            else:
                print(f"失败或跳过")

        # 获取统计信息
        print("\n" + "=" * 60)
        print("反反爬系统统计:")
        stats = anti_system.get_stats()

        print(f"总域名数: {stats['total_domains']}")
        print(f"总请求数: {stats['total_requests']}")
        print(f"黑名单域名数: {stats['blacklisted_domains']}")

        print("\n域名统计:")
        for domain, domain_stat in stats['domain_stats'].items():
            print(f"  {domain}:")
            print(f"    总请求数: {domain_stat['total_requests']}")
            print(f"    最近请求/分钟: {domain_stat['recent_requests_per_minute']}")

    finally:
        # 清理
        anti_system.session.close()

最佳实践

模拟浏览器行为的最佳实践
始终使用真实的User-Agent,避免使用python-requests默认值
使用Session对象管理Cookies和保持连接
设置完整的HTTP请求头,包括Accept、Accept-Language等
添加合理的请求延迟,避免高频请求触发反爬机制
轮换User-Agent和请求头,避免单一模式被识别
处理重定向和错误响应,实现健壮的错误处理
尊重robots.txt,控制爬取频率
对于JavaScript渲染的页面,考虑使用Selenium或Pyppeteer
使用代理IP池避免IP被封禁
记录日志和监控请求成功率,及时调整策略
生产环境最佳实践示例
import requests
import time
import random
import logging
from typing import Optional, Dict, Any
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class ProductionBrowserSimulator:
    """生产环境浏览器模拟器"""

    def __init__(self, config: Optional[Dict[str, Any]] = None):
        # 默认配置
        self.config = {
            'user_agent_pool': self.get_default_user_agents(),
            'min_delay': 1.0,
            'max_delay': 5.0,
            'max_retries': 3,
            'timeout': 30,
            'use_proxy': False,
            'proxy_pool': [],
            'respect_robots': True,
            'max_requests_per_domain_per_minute': 30,
        }

        # 更新用户配置
        if config:
            self.config.update(config)

        # 初始化
        self.session = self.create_session()
        self.request_stats = {}
        self.domain_timestamps = {}

        logger.info("生产环境浏览器模拟器已初始化")

    def get_default_user_agents(self):
        """获取默认User-Agent池"""
        return [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:92.0) Gecko/20100101 Firefox/92.0',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36',
        ]

    def create_session(self):
        """创建配置好的Session"""
        session = requests.Session()

        # 设置重试策略
        retry_strategy = Retry(
            total=self.config['max_retries'],
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET", "POST"],
            respect_retry_after_header=True
        )

        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("http://", adapter)
        session.mount("https://", adapter)

        # 设置默认请求头
        session.headers.update(self.get_browser_headers())

        # 设置代理(如果启用)
        if self.config['use_proxy'] and self.config['proxy_pool']:
            self.setup_proxy(session)

        return session

    def get_browser_headers(self):
        """获取浏览器请求头"""
        user_agent = random.choice(self.config['user_agent_pool'])

        return {
            'User-Agent': user_agent,
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
            'Cache-Control': 'max-age=0',
            'DNT': '1',
        }

    def setup_proxy(self, session):
        """设置代理"""
        proxy = random.choice(self.config['proxy_pool'])
        session.proxies = {
            'http': proxy,
            'https': proxy,
        }
        logger.info(f"使用代理: {proxy}")

    def rotate_user_agent(self):
        """轮换User-Agent"""
        new_ua = random.choice(self.config['user_agent_pool'])
        self.session.headers['User-Agent'] = new_ua
        logger.debug(f"轮换User-Agent: {new_ua[:50]}...")

    def check_rate_limit(self, domain: str) -> bool:
        """检查频率限制"""
        current_time = time.time()

        # 清理一分钟前的记录
        if domain in self.domain_timestamps:
            self.domain_timestamps[domain] = [
                t for t in self.domain_timestamps[domain]
                if current_time - t < 60
            ]

        # 检查是否超过限制
        recent_requests = len(self.domain_timestamps.get(domain, []))
        if recent_requests >= self.config['max_requests_per_domain_per_minute']:
            logger.warning(f"域名 {domain} 频率限制: {recent_requests}/分钟")
            return False

        # 记录当前请求
        if domain not in self.domain_timestamps:
            self.domain_timestamps[domain] = []
        self.domain_timestamps[domain].append(current_time)

        return True

    def apply_delay(self, domain: str):
        """应用延迟"""
        # 基础延迟
        delay = random.uniform(self.config['min_delay'], self.config['max_delay'])

        # 根据请求频率调整
        if domain in self.domain_timestamps:
            recent_count = len([
                t for t in self.domain_timestamps[domain]
                if time.time() - t < 60
            ])

            if recent_count > 20:
                delay *= 1.5
            elif recent_count > 10:
                delay *= 1.2

        logger.debug(f"应用延迟: {delay:.2f}秒")
        time.sleep(delay)

    def extract_domain(self, url: str) -> str:
        """提取域名"""
        from urllib.parse import urlparse
        parsed = urlparse(url)
        return parsed.netloc

    def request(self, method: str, url: str, **kwargs) -> Optional[requests.Response]:
        """发送请求(集成所有最佳实践)"""
        # 提取域名
        domain = self.extract_domain(url)

        # 检查频率限制
        if not self.check_rate_limit(domain):
            return None

        # 应用延迟
        self.apply_delay(domain)

        # 随机轮换User-Agent(30%概率)
        if random.random() < 0.3:
            self.rotate_user_agent()

        # 添加Referer(50%概率)
        if 'headers' not in kwargs:
            kwargs['headers'] = {}

        if 'Referer' not in kwargs['headers'] and random.random() < 0.5:
            referers = [
                'https://www.google.com/',
                'https://www.baidu.com/',
                'https://www.bing.com/',
                None
            ]
            referer = random.choice(referers)
            if referer:
                kwargs['headers']['Referer'] = referer

        # 设置超时
        if 'timeout' not in kwargs:
            kwargs['timeout'] = self.config['timeout']

        try:
            start_time = time.time()
            response = self.session.request(method, url, **kwargs)
            elapsed = time.time() - start_time

            # 记录统计
            self.record_request(domain, response.status_code, elapsed)

            # 检查响应
            if response.status_code == 200:
                logger.info(f"请求成功: {method} {url} ({elapsed:.2f}s)")

                # 检查是否被识别为爬虫
                self.check_anti_crawler(response, url)

                return response
            else:
                logger.warning(f"请求失败: {method} {url} - {response.status_code}")

                # 处理特定状态码
                if response.status_code == 429:  # Too Many Requests
                    retry_after = response.headers.get('Retry-After')
                    if retry_after:
                        try:
                            wait_time = int(retry_after)
                            logger.info(f"收到Retry-After头,等待 {wait_time} 秒")
                            time.sleep(wait_time)
                        except ValueError:
                            pass

                return response

        except requests.exceptions.Timeout:
            logger.error(f"请求超时: {method} {url}")
            return None
        except requests.exceptions.RequestException as e:
            logger.error(f"请求异常: {method} {url} - {e}")
            return None

    def record_request(self, domain: str, status_code: int, elapsed: float):
        """记录请求统计"""
        if domain not in self.request_stats:
            self.request_stats[domain] = {
                'total': 0,
                'success': 0,
                'failed': 0,
                'total_time': 0,
                'status_codes': {}
            }

        stats = self.request_stats[domain]
        stats['total'] += 1
        stats['total_time'] += elapsed

        if 200 <= status_code < 400:
            stats['success'] += 1
        else:
            stats['failed'] += 1

        stats['status_codes'][status_code] = stats['status_codes'].get(status_code, 0) + 1

    def check_anti_crawler(self, response: requests.Response, url: str):
        """检查反爬虫机制"""
        # 检查内容中是否包含反爬关键词
        anti_crawler_keywords = ['captcha', '验证码', 'robot', '爬虫', 'access denied']

        if response.text:
            text_lower = response.text.lower()
            for keyword in anti_crawler_keywords:
                if keyword in text_lower:
                    logger.warning(f"检测到反爬关键词 '{keyword}' 在 {url}")

                    # 触发响应处理
                    self.handle_anti_crawler_detected(url)
                    break

    def handle_anti_crawler_detected(self, url: str):
        """处理检测到的反爬虫"""
        domain = self.extract_domain(url)

        # 增加延迟
        extra_delay = random.uniform(10, 30)
        logger.info(f"反爬检测,额外等待 {extra_delay:.1f} 秒")
        time.sleep(extra_delay)

        # 轮换User-Agent
        self.rotate_user_agent()

        # 清理该域名的Cookies
        self.clear_domain_cookies(domain)

    def clear_domain_cookies(self, domain: str):
        """清理指定域名的Cookies"""
        cookies_to_keep = []

        for cookie in self.session.cookies:
            if domain not in cookie.domain:
                cookies_to_keep.append(cookie)

        # 重新设置Cookies
        self.session.cookies.clear()
        for cookie in cookies_to_keep:
            self.session.cookies.set_cookie(cookie)

        logger.debug(f"已清理域名 {domain} 的Cookies")

    def get_stats(self) -> Dict[str, Any]:
        """获取统计信息"""
        total_requests = 0
        total_success = 0

        for domain_stats in self.request_stats.values():
            total_requests += domain_stats['total']
            total_success += domain_stats['success']

        success_rate = total_success / total_requests if total_requests > 0 else 0

        return {
            'total_domains': len(self.request_stats),
            'total_requests': total_requests,
            'success_rate': success_rate,
            'domains': list(self.request_stats.keys()),
            'detailed_stats': self.request_stats,
        }

    def close(self):
        """关闭会话"""
        self.session.close()
        logger.info("浏览器模拟器已关闭")

# 使用示例
if __name__ == "__main__":
    # 配置
    config = {
        'min_delay': 2.0,
        'max_delay': 6.0,
        'max_requests_per_domain_per_minute': 20,
    }

    # 创建浏览器模拟器
    browser = ProductionBrowserSimulator(config)

    try:
        # 模拟浏览多个页面
        urls = [
            'https://httpbin.org/headers',
            'https://httpbin.org/ip',
            'https://httpbin.org/user-agent',
            'https://httpbin.org/status/200',
            'https://httpbin.org/status/404',
        ]

        for url in urls:
            print(f"\n请求: {url}")

            response = browser.request('GET', url)

            if response:
                print(f"状态码: {response.status_code}")
                if response.status_code == 200:
                    print(f"成功,响应大小: {len(response.content)} 字节")
            else:
                print("请求失败")

        # 获取统计信息
        print("\n" + "=" * 60)
        print("请求统计:")
        stats = browser.get_stats()

        print(f"总域名数: {stats['total_domains']}")
        print(f"总请求数: {stats['total_requests']}")
        print(f"成功率: {stats['success_rate']:.1%}")

        print("\n详细统计:")
        for domain, domain_stats in stats['detailed_stats'].items():
            print(f"  {domain}:")
            print(f"    总请求: {domain_stats['total']}")
            print(f"    成功: {domain_stats['success']}")
            print(f"    失败: {domain_stats['failed']}")
            print(f"    平均响应时间: {domain_stats['total_time']/domain_stats['total']:.3f}s")

            # 状态码分布
            if domain_stats['status_codes']:
                print("    状态码分布:")
                for code, count in domain_stats['status_codes'].items():
                    print(f"      {code}: {count}")

    finally:
        browser.close()

总结

模拟浏览器行为是爬虫开发中的关键技能。通过本教程,你学习了:

  • 请求头设置: 如何设置完整的HTTP请求头以模拟真实浏览器
  • User-Agent管理: 使用User-Agent池和轮换策略避免被识别
  • 会话管理: 使用Session对象管理Cookies和维持会话状态
  • 浏览器指纹: 理解并模拟浏览器指纹特征
  • 高级模拟技术: 包括行为模拟、代理轮换、参数随机化等
  • 反反爬策略: 应对网站反爬虫机制的综合方法
  • 最佳实践: 生产环境中的浏览器模拟最佳实践

记住:良好的爬虫应该像人类用户一样行为,既要高效获取数据,又要尊重网站资源,避免对目标网站造成过大负担。