Requests 响应文本内容处理

发送HTTP请求后,服务器会返回一个响应对象。本章将详细介绍如何处理响应中的各种内容,包括文本、JSON、二进制数据等。

响应对象结构

200 OK
application/json

响应包含状态码、头部信息和内容主体

响应对象概述

当使用Requests发送HTTP请求时,它会返回一个Response对象。这个对象包含了服务器返回的所有信息。

响应基本信息 4
  • 状态码 - HTTP状态码(200, 404, 500等)
  • 响应头 - 服务器返回的HTTP头部信息
  • 响应体 - 实际的内容数据
  • 请求信息 - 原始请求的相关信息
文本内容 3
  • response.text - 文本格式的响应内容
  • response.content - 字节格式的响应内容
  • response.json() - JSON格式的响应内容
JSON处理 4
  • 自动JSON解析
  • JSON解码错误处理
  • 自定义JSON解码器
  • 复杂JSON处理
二进制内容 3
  • 图片、PDF等文件
  • 字节流处理
  • 内存中的文件操作
响应处理流程
1
发送HTTP请求
Requests发送请求到服务器
2
接收响应
服务器返回HTTP响应
3
创建Response对象
Requests将响应封装为Response对象
4
处理响应内容
根据需要处理文本、JSON或二进制内容
HTTP/1.1 200 OK
Content-Type
:
application/json; charset=utf-8
Content-Length
:
127
Server
:
nginx/1.18.0
响应体
{ "id": 1, "name": "示例", "status": "success" }

Response对象详解

Response对象是Requests库的核心,它包含了HTTP响应的所有信息。

属性/方法 示例 说明
status_code response.status_code HTTP状态码(如200, 404, 500)
text response.text 响应内容,字符串格式
content response.content 响应内容,字节格式
json() response.json() 解析JSON响应,返回Python对象
headers response.headers 响应头字典
encoding response.encoding 响应内容的编码
url response.url 响应的最终URL(考虑重定向)
history response.history 重定向历史(列表)
cookies response.cookies 服务器设置的Cookie
elapsed response.elapsed 请求耗时
reason response.reason 状态码的文本描述
request response.request 对应的请求对象

Response对象基本使用

import requests

# 发送请求获取Response对象
response = requests.get('https://httpbin.org/json')

print("=== Response对象基本信息 ===")
print(f"状态码: {response.status_code}")
print(f"状态描述: {response.reason}")
print(f"最终URL: {response.url}")
print(f"请求方法: {response.request.method}")
print(f"请求URL: {response.request.url}")

print(f"\n=== 响应头信息 ===")
print(f"内容类型: {response.headers.get('content-type')}")
print(f"内容长度: {response.headers.get('content-length')}")
print(f"服务器: {response.headers.get('server')}")

print(f"\n=== 响应内容信息 ===")
print(f"编码: {response.encoding}")
print(f"内容长度(字节): {len(response.content)}")
print(f"内容长度(字符): {len(response.text)}")
print(f"请求耗时: {response.elapsed}")

print(f"\n=== 其他信息 ===")
print(f"是否重定向: {response.is_redirect}")
print(f"是否永久重定向: {response.is_permanent_redirect}")
print(f"重定向历史: {len(response.history)} 次")

if response.history:
    print("重定向历史详情:")
    for i, resp in enumerate(response.history):
        print(f"  重定向 {i+1}: {resp.status_code} {resp.url}")

print(f"\n=== Cookie信息 ===")
if response.cookies:
    for cookie in response.cookies:
        print(f"  {cookie.name}: {cookie.value}")
else:
    print("  没有Cookie")

状态码处理和验证

import requests

def check_response(response):
    """检查响应状态码并分类处理"""

    status = response.status_code

    if 200 <= status < 300:
        print(f"✓ 成功 ({status})")
        return True
    elif 300 <= status < 400:
        print(f"↪ 重定向 ({status})")
        if status == 301:
            print("  永久重定向")
        elif status == 302:
            print("  临时重定向")
        elif status == 304:
            print("  资源未修改")
        return True
    elif 400 <= status < 500:
        print(f"✗ 客户端错误 ({status})")
        if status == 400:
            print("  错误请求 - 检查请求参数")
        elif status == 401:
            print("  未授权 - 需要认证")
        elif status == 403:
            print("  禁止访问 - 无权限")
        elif status == 404:
            print("  资源不存在")
        elif status == 429:
            print("  请求过多 - 限流")
        return False
    elif 500 <= status < 600:
        print(f"✗ 服务器错误 ({status})")
        if status == 500:
            print("  服务器内部错误")
        elif status == 502:
            print("  错误的网关")
        elif status == 503:
            print("  服务不可用")
        elif status == 504:
            print("  网关超时")
        return False
    else:
        print(f"?未知状态码 ({status})")
        return False

# 使用requests的raise_for_status()方法
def safe_request(url):
    """安全的请求函数,自动检查状态码"""
    try:
        response = requests.get(url)
        response.raise_for_status()  # 如果状态码不是200-299,抛出HTTPError
        return response
    except requests.exceptions.HTTPError as http_err:
        print(f"HTTP错误: {http_err}")
    except requests.exceptions.ConnectionError as conn_err:
        print(f"连接错误: {conn_err}")
    except requests.exceptions.Timeout as timeout_err:
        print(f"超时错误: {timeout_err}")
    except requests.exceptions.RequestException as req_err:
        print(f"请求错误: {req_err}")
    return None

# 测试不同状态码
test_urls = [
    'https://httpbin.org/status/200',
    'https://httpbin.org/status/301',
    'https://httpbin.org/status/404',
    'https://httpbin.org/status/500'
]

print("=== 状态码测试 ===")
for url in test_urls:
    print(f"\n请求: {url}")
    response = requests.get(url, allow_redirects=False)
    check_response(response)

# 使用安全请求函数
print(f"\n=== 安全请求示例 ===")
response = safe_request('https://httpbin.org/json')
if response:
    print(f"请求成功,状态码: {response.status_code}")
Response对象重要特性:
  • 惰性求值 - 响应内容只有在访问时才从网络读取
  • 流式传输 - 可以处理大型响应而不会占用过多内存
  • 自动解码 - 根据响应头自动解码文本内容
  • 连接重用 - 当使用Session时,连接可以重用
  • 上下文管理 - 可以使用with语句确保响应被正确关闭

文本内容处理

response.text返回响应内容的字符串表示,适用于HTML、XML、纯文本等。

response.text
import requests

# 获取文本响应
response = requests.get('https://httpbin.org/html')

print(f"内容类型: {response.headers.get('content-type')}")
print(f"编码: {response.encoding}")

# 获取文本内容
text_content = response.text

print(f"\n响应文本长度: {len(text_content)} 字符")
print(f"前200个字符:")
print(text_content[:200] + "...")

# 检查内容类型
content_type = response.headers.get('content-type', '').lower()

if 'html' in content_type:
    print("\n这是HTML内容")
    # 可以进一步用BeautifulSoup解析
elif 'xml' in content_type:
    print("\n这是XML内容")
    # 可以进一步用xml.etree解析
elif 'plain' in content_type:
    print("\n这是纯文本内容")
else:
    print(f"\n内容类型: {content_type}")
文本内容操作
import requests
import re

response = requests.get('https://httpbin.org/html')
text = response.text

# 1. 基本文本操作
print(f"总字符数: {len(text)}")
print(f"行数: {len(text.splitlines())}")
print(f"单词数: {len(text.split())}")

# 2. 搜索文本
if 'Herman Melville' in text:
    print("找到 'Herman Melville'")

# 3. 使用正则表达式提取信息
# 提取所有标题
titles = re.findall(r'

(.*?)

', text, re.IGNORECASE | re.DOTALL) if titles: print(f"找到 {len(titles)} 个标题:") for title in titles: print(f" - {title.strip()}") # 4. 替换内容 cleaned_text = re.sub(r'<.*?>', '', text) # 移除HTML标签 cleaned_text = re.sub(r'\s+', ' ', cleaned_text) # 合并空白字符 cleaned_text = cleaned_text.strip() print(f"\n清理后文本前100字符:") print(cleaned_text[:100] + "...") # 5. 文本分析和统计 words = cleaned_text.split() word_count = {} for word in words: word = word.lower() if len(word) > 3: # 只统计长度大于3的单词 word_count[word] = word_count.get(word, 0) + 1 # 按频率排序 sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True) print(f"\n最常出现的单词:") for word, count in sorted_words[:10]: print(f" {word}: {count} 次")

大型文本内容处理

import requests
from io import StringIO

def process_large_text_stream(url, chunk_size=1024):
    """流式处理大型文本内容"""
    response = requests.get(url, stream=True)

    # 创建字符串缓冲区
    buffer = StringIO()
    total_size = 0
    line_count = 0

    # 逐块读取
    for chunk in response.iter_content(chunk_size=chunk_size, decode_unicode=True):
        if chunk:
            buffer.write(chunk)
            total_size += len(chunk)

            # 处理块中的行
            lines = chunk.split('\n')
            line_count += len(lines)

    # 获取完整文本
    full_text = buffer.getvalue()
    buffer.close()

    print(f"处理完成:")
    print(f"  总大小: {total_size} 字符")
    print(f"  总行数: {line_count}")
    print(f"  内容类型: {response.headers.get('content-type')}")

    return full_text

def process_line_by_line(url):
    """逐行处理文本(适用于文本文件)"""
    response = requests.get(url, stream=True)

    lines_processed = 0
    interesting_lines = []

    # 逐行读取(适用于文本响应)
    for line in response.iter_lines(decode_unicode=True):
        if line:
            lines_processed += 1

            # 示例:查找包含特定关键词的行
            if 'error' in line.lower() or 'warning' in line.lower():
                interesting_lines.append(line[:100])  # 只保存前100字符

            # 每处理1000行打印一次进度
            if lines_processed % 1000 == 0:
                print(f"已处理 {lines_processed} 行...")

    print(f"\n处理完成:")
    print(f"  总行数: {lines_processed}")
    print(f"  发现 {len(interesting_lines)} 个有趣的行")

    if interesting_lines:
        print("\n前5个有趣的行:")
        for i, line in enumerate(interesting_lines[:5]):
            print(f"  {i+1}. {line}")

    return lines_processed

# 使用示例
print("=== 大型文本处理示例 ===")

# 示例1:流式处理
print("\n1. 流式处理大型文本:")
try:
    text = process_large_text_stream('https://httpbin.org/html')
    print(f"  获取到文本,长度: {len(text)}")
except Exception as e:
    print(f"  错误: {e}")

# 示例2:逐行处理(假设是文本文件)
print("\n2. 逐行处理示例:")
# 注意:httpbin没有纯文本大文件端点,这里只是演示代码结构
# lines = process_line_by_line('https://example.com/large-log-file.txt')

# 示例3:分块处理并保存
def save_text_in_chunks(url, output_file, chunk_size=8192):
    """分块下载并保存文本文件"""
    response = requests.get(url, stream=True)

    with open(output_file, 'w', encoding='utf-8') as f:
        total_written = 0

        for chunk in response.iter_content(chunk_size=chunk_size, decode_unicode=True):
            if chunk:
                f.write(chunk)
                total_written += len(chunk)

                # 显示进度
                if total_written % (chunk_size * 10) == 0:
                    print(f"  已写入 {total_written} 字符...")

    print(f"文件保存完成: {output_file}")
    print(f"总大小: {total_written} 字符")

print("\n3. 分块保存示例:")
# save_text_in_chunks('https://example.com/large-file.txt', 'downloaded.txt')
文本处理注意事项:
  • 内存使用 - 大文件使用流式处理,避免内存不足
  • 编码问题 - 确保使用正确的编码处理文本
  • 性能考虑 - 复杂的文本处理(如正则表达式)可能影响性能
  • HTML/XML解析 - 对于结构化文本,使用专门的解析库(BeautifulSoup, lxml)
  • 错误处理 - 处理可能出现的解码错误和网络错误

JSON内容处理

response.json()方法将JSON响应解析为Python对象(字典或列表)。

基本JSON解析

import requests
import json

# 获取JSON响应
response = requests.get('https://httpbin.org/json')

print(f"内容类型: {response.headers.get('content-type')}")

# 方法1:使用response.json()自动解析
try:
    data = response.json()
    print(f"✓ 成功解析JSON")
    print(f"数据类型: {type(data)}")

    # 访问JSON数据
    print(f"\nJSON数据结构:")
    if isinstance(data, dict):
        print(f"  根对象是字典,有 {len(data)} 个键")
        for key in data.keys():
            print(f"    - {key}")

    # 提取嵌套数据
    if 'slideshow' in data:
        slideshow = data['slideshow']
        print(f"\n幻灯片信息:")
        print(f"  作者: {slideshow.get('author')}")
        print(f"  日期: {slideshow.get('date')}")
        print(f"  幻灯片数量: {len(slideshow.get('slides', []))}")

        # 遍历幻灯片
        for i, slide in enumerate(slideshow.get('slides', []), 1):
            print(f"  幻灯片 {i}: {slide.get('title')}")

except ValueError as e:
    print(f"✗ JSON解析错误: {e}")
except Exception as e:
    print(f"✗ 其他错误: {e}")

# 方法2:手动解析(当需要更多控制时)
print(f"\n=== 手动JSON解析 ===")
try:
    # 获取原始文本
    json_text = response.text

    # 使用json模块解析
    data_manual = json.loads(json_text)

    # 使用自定义参数
    data_custom = json.loads(
        json_text,
        parse_float=float,  # 浮点数解析函数
        parse_int=int,      # 整数解析函数
        parse_constant=None,  # 常量解析函数
        object_hook=None,   # 对象钩子函数
        object_pairs_hook=None  # 键值对钩子函数
    )

    print(f"手动解析成功,数据类型: {type(data_manual)}")

except json.JSONDecodeError as e:
    print(f"JSON解码错误: {e}")
    print(f"错误位置: 行{e.lineno}, 列{e.colno}")
    print(f"错误片段: {e.doc[e.pos-50:e.pos+50]}")

高级JSON处理

import requests
import json
from datetime import datetime
from decimal import Decimal

# 自定义JSON解码器
class CustomJSONDecoder(json.JSONDecoder):
    """自定义JSON解码器"""

    def __init__(self, *args, **kwargs):
        # 添加对象钩子
        kwargs['object_hook'] = self.object_hook
        super().__init__(*args, **kwargs)

    def object_hook(self, obj):
        """对象钩子,用于自定义对象解析"""
        # 处理日期字符串
        if 'date' in obj and isinstance(obj['date'], str):
            try:
                obj['date'] = datetime.fromisoformat(obj['date'].replace('Z', '+00:00'))
            except (ValueError, AttributeError):
                pass

        # 处理金额字段(字符串转Decimal)
        if 'amount' in obj and isinstance(obj['amount'], (str, int, float)):
            try:
                obj['amount'] = Decimal(str(obj['amount']))
            except:
                pass

        # 添加处理时间戳
        if 'timestamp' in obj and isinstance(obj['timestamp'], (int, float)):
            try:
                obj['timestamp'] = datetime.fromtimestamp(obj['timestamp'])
            except (ValueError, OSError):
                pass

        return obj

# 获取JSON数据
response = requests.get('https://httpbin.org/json')

try:
    # 使用自定义解码器
    custom_data = json.loads(response.text, cls=CustomJSONDecoder)

    print("使用自定义解码器解析的JSON:")
    print(json.dumps(custom_data, indent=2, default=str)[:500] + "...")

except json.JSONDecodeError as e:
    print(f"JSON解码错误: {e}")

# 处理复杂JSON结构
def extract_json_path(data, path):
    """从JSON数据中提取指定路径的值"""
    keys = path.split('.')
    current = data

    for key in keys:
        if isinstance(current, dict) and key in current:
            current = current[key]
        elif isinstance(current, list):
            try:
                index = int(key)
                if 0 <= index < len(current):
                    current = current[index]
                else:
                    return None
            except ValueError:
                return None
        else:
            return None

    return current

# 示例:从复杂JSON中提取数据
complex_json = {
    "users": [
        {
            "id": 1,
            "name": "张三",
            "profile": {
                "age": 25,
                "email": "zhangsan@example.com",
                "address": {
                    "city": "北京",
                    "country": "中国"
                }
            }
        },
        {
            "id": 2,
            "name": "李四",
            "profile": {
                "age": 30,
                "email": "lisi@example.com",
                "address": {
                    "city": "上海",
                    "country": "中国"
                }
            }
        }
    ]
}

print(f"\n=== JSON路径提取 ===")
paths = [
    "users.0.name",
    "users.1.profile.address.city",
    "users.0.profile.email",
    "users.2.name"  # 不存在的路径
]

for path in paths:
    value = extract_json_path(complex_json, path)
    print(f"{path}: {value}")

# JSON验证和格式化
def validate_and_format_json(json_text):
    """验证和格式化JSON"""
    try:
        # 解析JSON
        data = json.loads(json_text)

        # 格式化输出
        formatted = json.dumps(
            data,
            indent=2,
            ensure_ascii=False,  # 正确显示中文
            sort_keys=True,      # 按键排序
            default=str          # 处理无法序列化的对象
        )

        return formatted
    except json.JSONDecodeError as e:
        return f"JSON验证失败: {e}"

# 测试
test_json = '{"name": "张三", "age": 25, "active": true, "hobbies": ["编程", "读书"]}'
print(f"\n=== JSON验证和格式化 ===")
print(validate_and_format_json(test_json))

处理JSON API响应

import requests
import json
from typing import Dict, Any, List, Optional

class JSONAPIHandler:
    """JSON API响应处理器"""

    @staticmethod
    def handle_api_response(response: requests.Response) -> Dict[str, Any]:
        """处理API响应,返回标准化结果"""

        result = {
            'success': False,
            'status_code': response.status_code,
            'data': None,
            'error': None,
            'headers': dict(response.headers),
            'url': response.url,
            'elapsed': response.elapsed.total_seconds()
        }

        # 检查状态码
        if 200 <= response.status_code < 300:
            try:
                # 尝试解析JSON
                data = response.json()
                result['success'] = True
                result['data'] = data

                # 检查常见的API响应格式
                if isinstance(data, dict):
                    # 检查是否有错误字段
                    if 'error' in data:
                        result['success'] = False
                        result['error'] = data['error']
                    elif 'message' in data and not result['success']:
                        result['error'] = data['message']

            except ValueError as e:
                # JSON解析失败
                result['error'] = f"JSON解析失败: {e}"
                result['data'] = response.text[:500]  # 保存部分原始文本

            except Exception as e:
                result['error'] = f"处理响应时出错: {e}"

        else:
            # 非2xx状态码
            result['error'] = f"HTTP错误 {response.status_code}: {response.reason}"

            # 尝试获取错误详情
            try:
                error_data = response.json()
                if isinstance(error_data, dict) and 'error' in error_data:
                    result['error'] = error_data['error']
                elif isinstance(error_data, dict) and 'message' in error_data:
                    result['error'] = error_data['message']
            except:
                result['data'] = response.text[:500]

        return result

    @staticmethod
    def extract_pagination_info(data: Dict) -> Dict:
        """从API响应中提取分页信息"""
        pagination = {
            'page': 1,
            'per_page': 20,
            'total': 0,
            'total_pages': 0,
            'has_next': False,
            'has_prev': False
        }

        # 常见的分页字段名
        pagination_fields = {
            'page': ['page', 'current_page', 'page_number'],
            'per_page': ['per_page', 'limit', 'page_size'],
            'total': ['total', 'total_count', 'total_items'],
            'total_pages': ['total_pages', 'pages'],
            'next_page': ['next_page', 'next'],
            'prev_page': ['prev_page', 'previous']
        }

        if isinstance(data, dict):
            for field, possible_names in pagination_fields.items():
                for name in possible_names:
                    if name in data:
                        pagination[field] = data[name]
                        break

            # 计算是否有下一页/上一页
            if 'total' in data and 'per_page' in data:
                total = data['total']
                per_page = data['per_page']
                current_page = data.get('page', 1)

                pagination['total_pages'] = (total + per_page - 1) // per_page
                pagination['has_next'] = current_page < pagination['total_pages']
                pagination['has_prev'] = current_page > 1

        return pagination

    @staticmethod
    def flatten_json(data: Dict, parent_key: str = '', sep: str = '.') -> Dict:
        """扁平化嵌套的JSON结构"""
        items = []

        for key, value in data.items():
            new_key = f"{parent_key}{sep}{key}" if parent_key else key

            if isinstance(value, dict):
                items.extend(JSONAPIHandler.flatten_json(value, new_key, sep).items())
            elif isinstance(value, list):
                # 处理列表,转换为带索引的键
                for i, item in enumerate(value):
                    if isinstance(item, dict):
                        items.extend(
                            JSONAPIHandler.flatten_json(item, f"{new_key}[{i}]", sep).items()
                        )
                    else:
                        items.append((f"{new_key}[{i}]", item))
            else:
                items.append((new_key, value))

        return dict(items)

# 使用示例
print("=== JSON API处理器示例 ===")

# 测试API响应处理
test_responses = [
    requests.get('https://httpbin.org/json'),  # 成功响应
    requests.get('https://httpbin.org/status/404'),  # 404错误
    requests.get('https://httpbin.org/status/500'),  # 500错误
]

for i, response in enumerate(test_responses):
    print(f"\n测试 {i+1}: {response.url}")
    result = JSONAPIHandler.handle_api_response(response)

    print(f"  成功: {result['success']}")
    print(f"  状态码: {result['status_code']}")
    print(f"  错误: {result['error']}")

    if result['data'] and isinstance(result['data'], dict):
        print(f"  数据键: {list(result['data'].keys())[:5]}...")

# 测试分页信息提取
pagination_data = {
    "page": 2,
    "per_page": 10,
    "total": 45,
    "data": [{"id": i, "name": f"Item {i}"} for i in range(11, 21)]
}

print(f"\n=== 分页信息提取 ===")
pagination_info = JSONAPIHandler.extract_pagination_info(pagination_data)
print(json.dumps(pagination_info, indent=2))

# 测试JSON扁平化
nested_json = {
    "user": {
        "id": 1,
        "name": "张三",
        "profile": {
            "age": 25,
            "emails": ["zhangsan@example.com", "zs@work.com"]
        }
    },
    "orders": [
        {"id": 101, "total": 99.99},
        {"id": 102, "total": 49.99}
    ]
}

print(f"\n=== JSON扁平化 ===")
flattened = JSONAPIHandler.flatten_json(nested_json)
for key, value in flattened.items():
    print(f"  {key}: {value}")

二进制内容处理

response.content返回响应内容的字节表示,适用于图片、PDF、音视频等二进制文件。

二进制内容获取
import requests

# 获取二进制内容(如图片)
image_url = "https://httpbin.org/image/jpeg"
response = requests.get(image_url)

print(f"内容类型: {response.headers.get('content-type')}")
print(f"内容长度: {len(response.content)} 字节")

# 检查是否为二进制内容
content_type = response.headers.get('content-type', '').lower()

if 'image' in content_type:
    print("这是图片文件")

    # 获取二进制数据
    image_data = response.content

    print(f"图片大小: {len(image_data)} 字节")
    print(f"前100字节: {image_data[:100]}")

    # 检查图片格式
    if image_data[:3] == b'\xff\xd8\xff':
        print("这是JPEG格式图片")
    elif image_data[:8] == b'\x89PNG\r\n\x1a\n':
        print("这是PNG格式图片")
    elif image_data[:6] == b'GIF87a' or image_data[:6] == b'GIF89a':
        print("这是GIF格式图片")

elif 'application/pdf' in content_type:
    print("这是PDF文件")
elif 'application/zip' in content_type:
    print("这是ZIP压缩文件")
elif 'video' in content_type:
    print("这是视频文件")
elif 'audio' in content_type:
    print("这是音频文件")
else:
    print(f"未知的二进制格式: {content_type}")
保存二进制文件
import requests
import os
from pathlib import Path

def save_binary_file(url, filename=None, directory='downloads'):
    """下载并保存二进制文件"""

    # 创建保存目录
    os.makedirs(directory, exist_ok=True)

    # 发送请求
    response = requests.get(url, stream=True)

    # 如果没有指定文件名,从URL或Content-Disposition头获取
    if not filename:
        # 从URL获取文件名
        filename = url.split('/')[-1]

        # 如果文件名不合法,使用默认名
        if not filename or '.' not in filename:
            content_type = response.headers.get('content-type', '')
            extension = '.bin'  # 默认扩展名

            if 'jpeg' in content_type or 'jpg' in content_type:
                extension = '.jpg'
            elif 'png' in content_type:
                extension = '.png'
            elif 'pdf' in content_type:
                extension = '.pdf'
            elif 'zip' in content_type:
                extension = '.zip'

            filename = f'downloaded_file{extension}'

    # 完整文件路径
    filepath = os.path.join(directory, filename)

    # 保存文件
    with open(filepath, 'wb') as f:
        # 对于大文件,使用分块写入
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk)

    print(f"文件已保存: {filepath}")
    print(f"文件大小: {os.path.getsize(filepath)} 字节")

    return filepath

# 使用示例
print("=== 二进制文件下载示例 ===")

# 测试下载不同类型的文件
test_files = [
    'https://httpbin.org/image/jpeg',
    'https://httpbin.org/image/png',
    'https://httpbin.org/robots.txt'  # 文本文件,但可以作为二进制下载
]

for url in test_files:
    print(f"\n下载: {url}")
    try:
        saved_path = save_binary_file(url)
        print(f"  保存到: {saved_path}")
    except Exception as e:
        print(f"  下载失败: {e}")

# 批量下载
def download_files(url_list, directory='downloads'):
    """批量下载文件"""
    results = []

    for i, url in enumerate(url_list, 1):
        print(f"下载 {i}/{len(url_list)}: {url}")

        try:
            filepath = save_binary_file(url, directory=directory)
            results.append({
                'url': url,
                'success': True,
                'filepath': filepath,
                'error': None
            })
        except Exception as e:
            results.append({
                'url': url,
                'success': False,
                'filepath': None,
                'error': str(e)
            })

    # 统计结果
    success_count = sum(1 for r in results if r['success'])
    print(f"\n下载完成: {success_count}/{len(url_list)} 成功")

    return results

二进制内容的内存处理

import requests
import io
from PIL import Image  # 需要安装: pip install Pillow

# 在内存中处理二进制内容
def process_image_in_memory(image_url):
    """在内存中下载并处理图片"""

    # 下载图片到内存
    response = requests.get(image_url)

    if response.status_code != 200:
        print(f"下载失败: {response.status_code}")
        return None

    # 将二进制数据加载到内存文件对象
    image_bytes = io.BytesIO(response.content)

    try:
        # 使用PIL打开图片(不需要保存到磁盘)
        img = Image.open(image_bytes)

        print(f"图片信息:")
        print(f"  格式: {img.format}")
        print(f"  模式: {img.mode}")
        print(f"  尺寸: {img.size}")
        print(f"  宽度: {img.width} 像素")
        print(f"  高度: {img.height} 像素")

        # 可以在内存中处理图片
        # 例如:创建缩略图
        thumbnail_size = (100, 100)
        img.thumbnail(thumbnail_size)

        # 保存缩略图到内存
        thumbnail_bytes = io.BytesIO()
        img.save(thumbnail_bytes, format=img.format or 'JPEG')

        print(f"缩略图大小: {len(thumbnail_bytes.getvalue())} 字节")

        return {
            'original': response.content,
            'thumbnail': thumbnail_bytes.getvalue(),
            'info': {
                'format': img.format,
                'size': img.size,
                'mode': img.mode
            }
        }

    except Exception as e:
        print(f"图片处理错误: {e}")
        return None

def process_pdf_in_memory(pdf_url):
    """在内存中处理PDF文件(示例)"""
    response = requests.get(pdf_url)

    if response.status_code != 200:
        return None

    # 创建内存中的PDF文件对象
    pdf_bytes = io.BytesIO(response.content)

    print(f"PDF文件大小: {len(response.content)} 字节")

    # 这里可以添加PDF处理逻辑
    # 例如使用PyPDF2或pdfminer处理PDF内容

    return pdf_bytes

def extract_file_metadata(content, content_type):
    """从二进制内容中提取元数据"""
    metadata = {
        'size': len(content),
        'type': content_type,
        'is_binary': True
    }

    # 检查文件签名(魔术数字)
    signatures = {
        b'\xff\xd8\xff': 'JPEG',
        b'\x89PNG\r\n\x1a\n': 'PNG',
        b'GIF87a': 'GIF87a',
        b'GIF89a': 'GIF89a',
        b'%PDF': 'PDF',
        b'PK\x03\x04': 'ZIP',
        b'\x1f\x8b\x08': 'GZIP',
        b'\x42\x4d': 'BMP',
        b'\x49\x49\x2a\x00': 'TIFF (little-endian)',
        b'\x4d\x4d\x00\x2a': 'TIFF (big-endian)'
    }

    for signature, file_type in signatures.items():
        if content[:len(signature)] == signature:
            metadata['detected_type'] = file_type
            break

    return metadata

# 使用示例
print("=== 二进制内容内存处理 ===")

# 处理图片
image_info = process_image_in_memory('https://httpbin.org/image/jpeg')
if image_info:
    print(f"图片处理完成,缩略图大小: {len(image_info['thumbnail'])} 字节")

# 提取文件元数据
print(f"\n=== 文件元数据提取 ===")

test_urls = [
    ('https://httpbin.org/image/jpeg', 'image/jpeg'),
    ('https://httpbin.org/robots.txt', 'text/plain'),
]

for url, content_type in test_urls:
    response = requests.get(url)
    metadata = extract_file_metadata(response.content, content_type)

    print(f"\nURL: {url}")
    for key, value in metadata.items():
        print(f"  {key}: {value}")

# 二进制数据验证
def verify_binary_integrity(content, expected_size=None, checksum=None):
    """验证二进制数据的完整性"""
    result = {
        'size_ok': True,
        'checksum_ok': True,
        'errors': []
    }

    # 检查大小
    if expected_size and len(content) != expected_size:
        result['size_ok'] = False
        result['errors'].append(
            f"大小不匹配: 期望{expected_size}, 实际{len(content)}"
        )

    # 计算校验和(简单示例)
    if checksum:
        # 这里可以使用更复杂的校验算法(如MD5, SHA256)
        simple_checksum = sum(content) % 256
        if simple_checksum != checksum:
            result['checksum_ok'] = False
            result['errors'].append(
                f"校验和不匹配: 期望{checksum}, 实际{simple_checksum}"
            )

    return result

# 测试完整性验证
print(f"\n=== 二进制数据完整性验证 ===")
test_content = b'Hello, World!'
verification = verify_binary_integrity(
    test_content,
    expected_size=13,
    checksum=sum(test_content) % 256
)

print(f"验证结果:")
print(f"  大小正确: {verification['size_ok']}")
print(f"  校验和正确: {verification['checksum_ok']}")
if verification['errors']:
    print(f"  错误: {verification['errors']}")

编码处理

正确的编码处理对于文本内容至关重要,特别是处理中文等非ASCII字符时。

编码检测和设置
import requests
import chardet  # 需要安装: pip install chardet

# 获取响应
response = requests.get('https://httpbin.org/encoding/utf8')

print("=== 编码信息 ===")
print(f"响应头中的编码: {response.encoding}")
print(f"响应头Content-Type: {response.headers.get('content-type')}")

# Requests自动检测编码的规则:
# 1. 首先检查HTTP头部
# 2. 如果头部没有指定,使用chardet检测
# 3. 如果检测失败,使用ISO-8859-1

# 手动检测编码
if response.encoding is None:
    detected = chardet.detect(response.content)
    print(f"自动检测的编码: {detected['encoding']}")
    print(f"检测置信度: {detected['confidence']}")

    # 设置检测到的编码
    response.encoding = detected['encoding']

# 查看文本内容
text = response.text
print(f"\n文本内容前100字符:")
print(text[:100])

# 处理编码问题
def fix_encoding_issues(text):
    """修复常见的编码问题"""

    # 常见错误编码映射
    common_fixes = {
        'é': 'é',
        'è': 'è',
        'ê': 'ê',
        'Ã': 'à',
        'Â': '',
        # 添加更多映射...
    }

    for wrong, right in common_fixes.items():
        text = text.replace(wrong, right)

    return text

# 测试不同编码
test_urls = [
    ('https://httpbin.org/encoding/utf8', 'UTF-8'),
    # 注意:httpbin没有其他编码的测试端点
    # 这里只是演示代码结构
]

for url, expected_encoding in test_urls:
    print(f"\n测试URL: {url}")
    response = requests.get(url)

    print(f"  预期编码: {expected_encoding}")
    print(f"  实际编码: {response.encoding}")

    if response.encoding and response.encoding.upper() != expected_encoding.upper():
        print(f"  ⚠ 编码不匹配!")
        # 可以尝试重新编码
        try:
            # 先按检测到的编码解码,再按正确编码编码
            corrected_text = response.content.decode(response.encoding).encode(expected_encoding).decode(expected_encoding)
            print(f"  纠正后文本前50字符: {corrected_text[:50]}")
        except Exception as e:
            print(f"  编码纠正失败: {e}")
编码处理工具
import requests
import chardet
from typing import Optional

class EncodingHandler:
    """编码处理工具类"""

    @staticmethod
    def detect_encoding(content: bytes) -> Optional[str]:
        """检测字节内容的编码"""
        try:
            result = chardet.detect(content)
            if result['confidence'] > 0.7:  # 置信度阈值
                return result['encoding']
        except:
            pass

        # 尝试常见编码
        common_encodings = ['utf-8', 'gbk', 'gb2312', 'big5', 'shift_jis', 'euc-jp']

        for encoding in common_encodings:
            try:
                content.decode(encoding)
                return encoding
            except UnicodeDecodeError:
                continue

        return None

    @staticmethod
    def safe_decode(content: bytes, encoding: Optional[str] = None) -> str:
        """安全解码字节内容"""
        if encoding:
            try:
                return content.decode(encoding)
            except UnicodeDecodeError:
                pass

        # 自动检测
        detected = EncodingHandler.detect_encoding(content)
        if detected:
            try:
                return content.decode(detected)
            except UnicodeDecodeError:
                pass

        # 最后尝试
        try:
            return content.decode('utf-8', errors='replace')
        except:
            return content.decode('latin-1', errors='replace')

    @staticmethod
    def normalize_encoding(encoding: str) -> str:
        """标准化编码名称"""
        encoding = encoding.lower()

        # 常见编码别名映射
        aliases = {
            'utf8': 'utf-8',
            'utf-8-sig': 'utf-8',
            'gb2312': 'gbk',
            'cp936': 'gbk',
            'big5-hkscs': 'big5',
            'shift_jis': 'shift-jis',
            'euc_jp': 'euc-jp'
        }

        return aliases.get(encoding, encoding)

    @staticmethod
    def fix_mojibake(text: str) -> str:
        """修复乱码"""
        # 常见乱码模式
        mojibake_patterns = [
            # UTF-8被错误解释为Windows-1252
            ('é', 'é'),
            ('è', 'è'),
            ('ê', 'ê'),
            ('ë', 'ë'),
            ('á', 'á'),
            ('Ã ', 'à'),
            ('ñ', 'ñ'),
            ('ó', 'ó'),
            ('ö', 'ö'),
            ('ú', 'ú'),
            ('ü', 'ü'),
            ('ç', 'ç'),
            ('ÃŽ', 'Î'),
            ('Ã¥', 'å'),
            ('æ', 'æ'),
            ('ø', 'ø'),
            ('ÿ', 'ÿ'),

            # GBK被错误解释为UTF-8
            ('绿', '绿'),
            ('é»', '红'),
            ('è', '蓝'),
        ]

        for wrong, right in mojibake_patterns:
            text = text.replace(wrong, right)

        return text

# 使用示例
print("=== 编码处理工具示例 ===")

# 测试不同编码的文本
test_cases = [
    (b'Hello, World!', 'ascii'),
    (b'\xe4\xbd\xa0\xe5\xa5\xbd', 'utf-8'),  # 你好
    (b'\xc4\xe3\xba\xc3', 'gbk'),  # 你好
]

for content, expected_encoding in test_cases:
    print(f"\n测试内容: {content}")
    print(f"期望编码: {expected_encoding}")

    # 检测编码
    detected = EncodingHandler.detect_encoding(content)
    print(f"检测到的编码: {detected}")

    # 安全解码
    decoded = EncodingHandler.safe_decode(content)
    print(f"解码结果: {decoded}")

    # 标准化编码
    if detected:
        normalized = EncodingHandler.normalize_encoding(detected)
        print(f"标准化编码: {normalized}")

# 测试乱码修复
print(f"\n=== 乱码修复示例 ===")

# 模拟乱码(UTF-8被错误解释为Windows-1252)
mojibake_text = "Café"  # 应该是 "Café"
print(f"乱码文本: {mojibake_text}")
fixed_text = EncodingHandler.fix_mojibake(mojibake_text)
print(f"修复后: {fixed_text}")

# 处理网页编码
def fetch_with_correct_encoding(url):
    """获取网页并使用正确的编码"""
    response = requests.get(url)

    # 如果响应头没有指定编码或编码不正确
    if not response.encoding or response.encoding.lower() == 'iso-8859-1':
        # 检测实际编码
        actual_encoding = EncodingHandler.detect_encoding(response.content)

        if actual_encoding:
            response.encoding = actual_encoding
            print(f"检测到实际编码: {actual_encoding}")
        else:
            # 常见网页编码
            response.encoding = 'utf-8'
            print(f"使用默认编码: utf-8")

    return response.text

# 模拟获取网页
print(f"\n=== 网页编码处理 ===")
# 注意:这里只是演示,实际使用时需要真实的URL
# html = fetch_with_correct_encoding('https://example.com')

保存响应内容到文件

将响应内容保存到文件是常见的需求,特别是下载图片、文档等资源时。

基本文件保存

import requests
import os
from pathlib import Path

def save_response_to_file(response, filename, directory='downloads'):
    """保存响应内容到文件"""

    # 创建目录
    os.makedirs(directory, exist_ok=True)

    # 完整文件路径
    filepath = os.path.join(directory, filename)

    # 根据内容类型决定保存方式
    content_type = response.headers.get('content-type', '').lower()

    if 'text' in content_type:
        # 文本文件
        with open(filepath, 'w', encoding='utf-8') as f:
            f.write(response.text)
        print(f"文本文件已保存: {filepath}")

    else:
        # 二进制文件
        with open(filepath, 'wb') as f:
            f.write(response.content)
        print(f"二进制文件已保存: {filepath}")

    # 验证文件
    if os.path.exists(filepath):
        file_size = os.path.getsize(filepath)
        print(f"文件大小: {file_size} 字节")
        return filepath
    else:
        print(f"文件保存失败")
        return None

# 使用示例
print("=== 基本文件保存示例 ===")

# 保存文本文件
response = requests.get('https://httpbin.org/html')
save_response_to_file(response, 'example.html')

# 保存图片
response = requests.get('https://httpbin.org/image/png')
save_response_to_file(response, 'example.png')

# 保存JSON
response = requests.get('https://httpbin.org/json')
save_response_to_file(response, 'data.json')

# 从URL推断文件名
def get_filename_from_response(response, default='download'):
    """从响应中获取合适的文件名"""

    # 从Content-Disposition头获取文件名
    content_disposition = response.headers.get('content-disposition', '')
    if 'filename=' in content_disposition:
        # 提取文件名
        import re
        match = re.search(r'filename="?([^"]+)"?', content_disposition)
        if match:
            return match.group(1)

    # 从URL获取文件名
    url = response.url
    filename = url.split('/')[-1]

    # 清理文件名
    if '?' in filename:
        filename = filename.split('?')[0]

    # 如果没有扩展名,根据内容类型添加
    if '.' not in filename:
        content_type = response.headers.get('content-type', '')

        extensions = {
            'text/html': '.html',
            'application/json': '.json',
            'image/jpeg': '.jpg',
            'image/png': '.png',
            'image/gif': '.gif',
            'application/pdf': '.pdf',
            'application/zip': '.zip',
            'text/plain': '.txt'
        }

        for ct, ext in extensions.items():
            if ct in content_type:
                filename += ext
                break

    return filename or default

# 测试文件名获取
print(f"\n=== 文件名推断 ===")

test_responses = [
    requests.get('https://httpbin.org/html'),
    requests.get('https://httpbin.org/image/png'),
    requests.get('https://httpbin.org/json'),
]

for response in test_responses:
    filename = get_filename_from_response(response)
    print(f"URL: {response.url}")
    print(f"  推断的文件名: {filename}")
    print(f"  内容类型: {response.headers.get('content-type')}")

高级文件下载功能

import requests
import os
import time
from pathlib import Path
from typing import Optional, Dict, Any
from urllib.parse import urlparse

class FileDownloader:
    """高级文件下载器"""

    def __init__(self, download_dir='downloads'):
        self.download_dir = download_dir
        os.makedirs(download_dir, exist_ok=True)

        # 下载统计
        self.stats = {
            'total_downloads': 0,
            'successful_downloads': 0,
            'failed_downloads': 0,
            'total_bytes': 0
        }

    def download(self, url: str,
                 filename: Optional[str] = None,
                 overwrite: bool = False,
                 chunk_size: int = 8192,
                 timeout: int = 30) -> Dict[str, Any]:
        """下载文件"""

        self.stats['total_downloads'] += 1

        result = {
            'url': url,
            'success': False,
            'filename': None,
            'filepath': None,
            'size': 0,
            'time_taken': 0,
            'error': None
        }

        start_time = time.time()

        try:
            # 发送请求
            response = requests.get(url, stream=True, timeout=timeout)
            response.raise_for_status()

            # 获取文件名
            if not filename:
                filename = self._get_filename(response, url)

            # 完整文件路径
            filepath = os.path.join(self.download_dir, filename)

            # 检查文件是否已存在
            if os.path.exists(filepath) and not overwrite:
                result['error'] = f"文件已存在: {filepath}"
                result['filepath'] = filepath
                return result

            # 获取文件大小(如果服务器支持)
            total_size = int(response.headers.get('content-length', 0))

            # 下载文件
            downloaded = 0
            with open(filepath, 'wb') as f:
                for chunk in response.iter_content(chunk_size=chunk_size):
                    if chunk:
                        f.write(chunk)
                        downloaded += len(chunk)

                        # 显示进度(对于大文件)
                        if total_size > 0:
                            percent = (downloaded / total_size) * 100
                            if downloaded % (chunk_size * 100) == 0:
                                print(f"  进度: {downloaded}/{total_size} 字节 ({percent:.1f}%)")

            # 验证下载
            actual_size = os.path.getsize(filepath)

            if total_size > 0 and actual_size != total_size:
                result['error'] = f"文件大小不匹配: 期望{total_size}, 实际{actual_size}"
                os.remove(filepath)  # 删除不完整的文件
            else:
                result['success'] = True
                result['filename'] = filename
                result['filepath'] = filepath
                result['size'] = actual_size

                self.stats['successful_downloads'] += 1
                self.stats['total_bytes'] += actual_size

        except requests.exceptions.Timeout:
            result['error'] = f"请求超时 ({timeout}秒)"
        except requests.exceptions.HTTPError as e:
            result['error'] = f"HTTP错误: {e}"
        except requests.exceptions.ConnectionError:
            result['error'] = "连接错误"
        except requests.exceptions.RequestException as e:
            result['error'] = f"请求错误: {e}"
        except IOError as e:
            result['error'] = f"文件IO错误: {e}"
        except Exception as e:
            result['error'] = f"未知错误: {e}"

        finally:
            end_time = time.time()
            result['time_taken'] = end_time - start_time

            if not result['success']:
                self.stats['failed_downloads'] += 1

        return result

    def _get_filename(self, response: requests.Response, url: str) -> str:
        """获取合适的文件名"""

        # 方法1: 从Content-Disposition头获取
        filename = self._get_filename_from_headers(response)
        if filename:
            return filename

        # 方法2: 从URL路径获取
        parsed_url = urlparse(url)
        path = parsed_url.path

        if path and path != '/':
            filename = os.path.basename(path)
            if filename:
                return filename

        # 方法3: 根据内容类型生成
        content_type = response.headers.get('content-type', '').lower()

        extensions = {
            'text/html': '.html',
            'application/json': '.json',
            'image/jpeg': '.jpg',
            'image/png': '.png',
            'image/gif': '.gif',
            'application/pdf': '.pdf',
            'application/zip': '.zip',
            'text/plain': '.txt',
            'application/xml': '.xml',
            'text/css': '.css',
            'application/javascript': '.js'
        }

        ext = '.bin'
        for ct, extension in extensions.items():
            if ct in content_type:
                ext = extension
                break

        # 生成基于时间的文件名
        timestamp = int(time.time())
        return f'download_{timestamp}{ext}'

    def _get_filename_from_headers(self, response: requests.Response) -> Optional[str]:
        """从响应头获取文件名"""
        content_disposition = response.headers.get('content-disposition', '')

        if content_disposition:
            # 查找filename=后面的内容
            import re

            # 处理 filename="file.txt" 或 filename=file.txt
            patterns = [
                r'filename="([^"]+)"',
                r"filename='([^']+)'",
                r'filename=([^;]+)'
            ]

            for pattern in patterns:
                match = re.search(pattern, content_disposition)
                if match:
                    filename = match.group(1).strip()
                    # 清理文件名(移除路径和特殊字符)
                    filename = os.path.basename(filename)
                    filename = re.sub(r'[^\w\-_.]', '_', filename)
                    return filename

        return None

    def batch_download(self, urls, max_workers=3):
        """批量下载文件"""
        results = []

        # 简单实现(可以改进为多线程/多进程)
        for i, url in enumerate(urls, 1):
            print(f"下载 {i}/{len(urls)}: {url}")

            result = self.download(url)
            results.append(result)

            if result['success']:
                print(f"  ✓ 成功: {result['filename']} ({result['size']} 字节)")
            else:
                print(f"  ✗ 失败: {result['error']}")

        return results

    def get_stats(self):
        """获取下载统计"""
        return self.stats.copy()

# 使用示例
print("=== 高级文件下载器示例 ===")

# 创建下载器
downloader = FileDownloader('my_downloads')

# 下载单个文件
print("\n1. 下载单个文件:")
result = downloader.download(
    'https://httpbin.org/image/png',
    filename='test_image.png',
    overwrite=True
)

print(f"  成功: {result['success']}")
print(f"  文件: {result['filename']}")
print(f"  大小: {result['size']} 字节")
print(f"  耗时: {result['time_taken']:.2f} 秒")

# 批量下载
print("\n2. 批量下载测试:")
urls = [
    'https://httpbin.org/html',
    'https://httpbin.org/image/jpeg',
    'https://httpbin.org/robots.txt',
    # 添加更多URL...
]

results = downloader.batch_download(urls[:3])  # 只测试前3个

print("\n下载统计:")
stats = downloader.get_stats()
for key, value in stats.items():
    print(f"  {key}: {value}")

# 清理测试目录
import shutil
if os.path.exists('my_downloads'):
    shutil.rmtree('my_downloads')
    print("\n已清理测试目录")

高级响应处理

1. 响应内容转换器

import requests
import json
import csv
import io
from typing import Any, Dict, List, Optional
from xml.etree import ElementTree

class ResponseConverter:
    """响应内容转换器"""

    @staticmethod
    def to_json(response: requests.Response, **kwargs) -> Any:
        """转换为JSON"""
        try:
            return response.json(**kwargs)
        except ValueError as e:
            raise ValueError(f"JSON转换失败: {e}")

    @staticmethod
    def to_xml(response: requests.Response) -> ElementTree.Element:
        """转换为XML"""
        try:
            return ElementTree.fromstring(response.content)
        except ElementTree.ParseError as e:
            raise ValueError(f"XML转换失败: {e}")

    @staticmethod
    def to_csv(response: requests.Response,
               delimiter: str = ',') -> List[Dict[str, str]]:
        """转换为CSV(列表字典)"""
        try:
            # 使用StringIO处理文本
            text = response.text
            csv_file = io.StringIO(text)

            # 读取CSV
            import csv as csv_module
            reader = csv_module.DictReader(csv_file, delimiter=delimiter)
            return list(reader)
        except Exception as e:
            raise ValueError(f"CSV转换失败: {e}")

    @staticmethod
    def to_text(response: requests.Response,
                strip: bool = True) -> str:
        """转换为文本"""
        text = response.text
        if strip:
            text = text.strip()
        return text

    @staticmethod
    def to_lines(response: requests.Response) -> List[str]:
        """转换为行列表"""
        text = response.text
        lines = text.splitlines()
        # 移除空行
        lines = [line.strip() for line in lines if line.strip()]
        return lines

    @staticmethod
    def auto_convert(response: requests.Response) -> Any:
        """根据内容类型自动转换"""
        content_type = response.headers.get('content-type', '').lower()

        if 'json' in content_type:
            return ResponseConverter.to_json(response)
        elif 'xml' in content_type:
            return ResponseConverter.to_xml(response)
        elif 'csv' in content_type or 'text/csv' in content_type:
            return ResponseConverter.to_csv(response)
        elif 'text/html' in content_type:
            # 对于HTML,可以返回文本或使用BeautifulSoup解析
            return ResponseConverter.to_text(response)
        elif 'text/plain' in content_type:
            return ResponseConverter.to_text(response)
        else:
            # 其他类型返回原始内容
            return response.content

# 使用示例
print("=== 响应内容转换器示例 ===")

# 测试不同内容类型的转换
test_endpoints = [
    ('https://httpbin.org/json', 'application/json'),
    ('https://httpbin.org/xml', 'application/xml'),
    # httpbin没有CSV端点,这里只是演示代码
    # ('https://example.com/data.csv', 'text/csv'),
    ('https://httpbin.org/html', 'text/html'),
    ('https://httpbin.org/robots.txt', 'text/plain'),
]

for url, expected_type in test_endpoints:
    print(f"\n测试: {url}")
    print(f"期望类型: {expected_type}")

    try:
        response = requests.get(url)

        # 自动转换
        converted = ResponseConverter.auto_convert(response)

        print(f"转换类型: {type(converted)}")

        if isinstance(converted, dict):
            print(f"  JSON键: {list(converted.keys())[:5]}...")
        elif isinstance(converted, ElementTree.Element):
            print(f"  XML根标签: {converted.tag}")
        elif isinstance(converted, list):
            print(f"  列表长度: {len(converted)}")
            if converted and isinstance(converted[0], dict):
                print(f"  第一行键: {list(converted[0].keys())}")
        elif isinstance(converted, str):
            print(f"  文本长度: {len(converted)} 字符")
            print(f"  前50字符: {converted[:50]}...")
        elif isinstance(converted, bytes):
            print(f"  二进制长度: {len(converted)} 字节")

    except Exception as e:
        print(f"  错误: {e}")

2. 响应内容分析器

import requests
import json
from typing import Dict, Any, List
from collections import Counter
import re

class ResponseAnalyzer:
    """响应内容分析器"""

    @staticmethod
    def analyze_text(response: requests.Response) -> Dict[str, Any]:
        """分析文本响应"""
        text = response.text

        # 基本统计
        char_count = len(text)
        line_count = len(text.splitlines())
        word_count = len(text.split())

        # 单词频率
        words = re.findall(r'\b\w+\b', text.lower())
        word_freq = Counter(words)
        top_words = word_freq.most_common(10)

        # 句子统计(简单实现)
        sentences = re.split(r'[.!?]+', text)
        sentence_count = len([s for s in sentences if s.strip()])

        # 阅读时间估计(按平均阅读速度200词/分钟)
        reading_time = word_count / 200  # 分钟

        return {
            'character_count': char_count,
            'line_count': line_count,
            'word_count': word_count,
            'sentence_count': sentence_count,
            'top_words': top_words,
            'estimated_reading_time': f"{reading_time:.1f} 分钟"
        }

    @staticmethod
    def analyze_json(response: requests.Response) -> Dict[str, Any]:
        """分析JSON响应"""
        try:
            data = response.json()

            def analyze_structure(obj, depth=0):
                """递归分析JSON结构"""
                if isinstance(obj, dict):
                    keys = list(obj.keys())
                    result = {
                        'type': 'object',
                        'key_count': len(keys),
                        'keys': keys,
                        'children': {}
                    }

                    for key, value in obj.items():
                        result['children'][key] = analyze_structure(value, depth+1)

                    return result

                elif isinstance(obj, list):
                    if obj:
                        # 分析第一个元素作为示例
                        sample = analyze_structure(obj[0], depth+1)
                        return {
                            'type': 'array',
                            'length': len(obj),
                            'sample_type': sample['type'],
                            'sample': sample
                        }
                    else:
                        return {
                            'type': 'array',
                            'length': 0,
                            'sample_type': 'empty'
                        }

                else:
                    return {
                        'type': type(obj).__name__,
                        'value': str(obj)[:100]  # 截断长值
                    }

            structure = analyze_structure(data)

            return {
                'data_type': type(data).__name__,
                'structure': structure
            }

        except ValueError:
            return {'error': '无效的JSON'}

    @staticmethod
    def analyze_html(response: requests.Response) -> Dict[str, Any]:
        """分析HTML响应(简单版本)"""
        text = response.text

        # 标签统计
        tags = re.findall(r']*src="([^"]+)"', text)

        # 标题统计
        headings = {}
        for level in range(1, 7):
            pattern = fr']*>(.*?)'
            matches = re.findall(pattern, text, re.IGNORECASE | re.DOTALL)
            headings[f'h{level}'] = len(matches)

        return {
            'tag_count': dict(tag_count.most_common(10)),
            'total_tags': sum(tag_count.values()),
            'unique_tags': len(tag_count),
            'link_count': len(links),
            'internal_links': len(internal_links),
            'external_links': len(external_links),
            'image_count': len(images),
            'heading_count': headings
        }

    @staticmethod
    def generate_report(response: requests.Response) -> Dict[str, Any]:
        """生成完整的响应分析报告"""
        content_type = response.headers.get('content-type', '').lower()

        report = {
            'url': response.url,
            'status_code': response.status_code,
            'content_type': content_type,
            'content_length': len(response.content),
            'headers': dict(response.headers),
            'analysis': {}
        }

        # 根据内容类型进行分析
        if 'json' in content_type:
            report['analysis']['json'] = ResponseAnalyzer.analyze_json(response)
        elif 'html' in content_type:
            report['analysis']['html'] = ResponseAnalyzer.analyze_html(response)
        elif 'text' in content_type:
            report['analysis']['text'] = ResponseAnalyzer.analyze_text(response)

        # 通用分析
        report['analysis']['general'] = {
            'encoding': response.encoding,
            'elapsed_time': str(response.elapsed),
            'is_redirect': response.is_redirect,
            'redirect_history': len(response.history)
        }

        return report

# 使用示例
print("=== 响应内容分析器示例 ===")

# 分析HTML响应
print("\n1. 分析HTML响应:")
html_response = requests.get('https://httpbin.org/html')
html_report = ResponseAnalyzer.generate_report(html_response)

print(f"URL: {html_report['url']}")
print(f"状态码: {html_report['status_code']}")
print(f"内容类型: {html_report['content_type']}")

if 'html' in html_report['analysis']:
    html_analysis = html_report['analysis']['html']
    print(f"总标签数: {html_analysis.get('total_tags', 0)}")
    print(f"链接数: {html_analysis.get('link_count', 0)}")
    print(f"图片数: {html_analysis.get('image_count', 0)}")

# 分析JSON响应
print("\n2. 分析JSON响应:")
json_response = requests.get('https://httpbin.org/json')
json_report = ResponseAnalyzer.generate_report(json_response)

print(f"URL: {json_report['url']}")
print(f"状态码: {json_report['status_code']}")

if 'json' in json_report['analysis']:
    json_analysis = json_report['analysis']['json']
    print(f"数据类型: {json_analysis.get('data_type', '未知')}")

# 分析文本响应
print("\n3. 分析文本响应:")
text_response = requests.get('https://httpbin.org/robots.txt')
text_report = ResponseAnalyzer.generate_report(text_response)

print(f"URL: {text_report['url']}")
print(f"状态码: {text_report['status_code']}")

if 'text' in text_report['analysis']:
    text_analysis = text_report['analysis']['text']
    print(f"字符数: {text_analysis.get('character_count', 0)}")
    print(f"单词数: {text_analysis.get('word_count', 0)}")
    print(f"行数: {text_analysis.get('line_count', 0)}")

总结

本章详细介绍了Requests库中响应文本内容的处理方法:

  1. Response对象 - 响应对象的基本属性和方法,状态码处理
  2. 文本内容 - 使用response.text处理HTML、XML、纯文本
  3. JSON内容 - 使用response.json()解析JSON,处理复杂结构
  4. 二进制内容 - 使用response.content处理图片、PDF等二进制数据
  5. 编码处理 - 检测和设置正确的编码,处理乱码问题
  6. 文件保存 - 将响应内容保存到文件,支持流式下载
  7. 高级处理 - 响应转换器、分析器等高级功能

最佳实践建议:

  • 始终检查响应状态码,使用raise_for_status()处理错误
  • 处理大文件时使用流式传输,避免内存不足
  • 正确处理编码问题,特别是处理中文内容时
  • 对于API响应,实现统一的错误处理机制
  • 下载文件时考虑断点续传和进度显示
  • 根据内容类型选择合适的处理方式