响应包含状态码、头部信息和内容主体
当使用Requests发送HTTP请求时,它会返回一个Response对象。这个对象包含了服务器返回的所有信息。
response.text - 文本格式的响应内容response.content - 字节格式的响应内容response.json() - JSON格式的响应内容Response对象是Requests库的核心,它包含了HTTP响应的所有信息。
| 属性/方法 | 示例 | 说明 |
|---|---|---|
status_code |
response.status_code |
HTTP状态码(如200, 404, 500) |
text |
response.text |
响应内容,字符串格式 |
content |
response.content |
响应内容,字节格式 |
json() |
response.json() |
解析JSON响应,返回Python对象 |
headers |
response.headers |
响应头字典 |
encoding |
response.encoding |
响应内容的编码 |
url |
response.url |
响应的最终URL(考虑重定向) |
history |
response.history |
重定向历史(列表) |
cookies |
response.cookies |
服务器设置的Cookie |
elapsed |
response.elapsed |
请求耗时 |
reason |
response.reason |
状态码的文本描述 |
request |
response.request |
对应的请求对象 |
import requests
# 发送请求获取Response对象
response = requests.get('https://httpbin.org/json')
print("=== Response对象基本信息 ===")
print(f"状态码: {response.status_code}")
print(f"状态描述: {response.reason}")
print(f"最终URL: {response.url}")
print(f"请求方法: {response.request.method}")
print(f"请求URL: {response.request.url}")
print(f"\n=== 响应头信息 ===")
print(f"内容类型: {response.headers.get('content-type')}")
print(f"内容长度: {response.headers.get('content-length')}")
print(f"服务器: {response.headers.get('server')}")
print(f"\n=== 响应内容信息 ===")
print(f"编码: {response.encoding}")
print(f"内容长度(字节): {len(response.content)}")
print(f"内容长度(字符): {len(response.text)}")
print(f"请求耗时: {response.elapsed}")
print(f"\n=== 其他信息 ===")
print(f"是否重定向: {response.is_redirect}")
print(f"是否永久重定向: {response.is_permanent_redirect}")
print(f"重定向历史: {len(response.history)} 次")
if response.history:
print("重定向历史详情:")
for i, resp in enumerate(response.history):
print(f" 重定向 {i+1}: {resp.status_code} {resp.url}")
print(f"\n=== Cookie信息 ===")
if response.cookies:
for cookie in response.cookies:
print(f" {cookie.name}: {cookie.value}")
else:
print(" 没有Cookie")
import requests
def check_response(response):
"""检查响应状态码并分类处理"""
status = response.status_code
if 200 <= status < 300:
print(f"✓ 成功 ({status})")
return True
elif 300 <= status < 400:
print(f"↪ 重定向 ({status})")
if status == 301:
print(" 永久重定向")
elif status == 302:
print(" 临时重定向")
elif status == 304:
print(" 资源未修改")
return True
elif 400 <= status < 500:
print(f"✗ 客户端错误 ({status})")
if status == 400:
print(" 错误请求 - 检查请求参数")
elif status == 401:
print(" 未授权 - 需要认证")
elif status == 403:
print(" 禁止访问 - 无权限")
elif status == 404:
print(" 资源不存在")
elif status == 429:
print(" 请求过多 - 限流")
return False
elif 500 <= status < 600:
print(f"✗ 服务器错误 ({status})")
if status == 500:
print(" 服务器内部错误")
elif status == 502:
print(" 错误的网关")
elif status == 503:
print(" 服务不可用")
elif status == 504:
print(" 网关超时")
return False
else:
print(f"?未知状态码 ({status})")
return False
# 使用requests的raise_for_status()方法
def safe_request(url):
"""安全的请求函数,自动检查状态码"""
try:
response = requests.get(url)
response.raise_for_status() # 如果状态码不是200-299,抛出HTTPError
return response
except requests.exceptions.HTTPError as http_err:
print(f"HTTP错误: {http_err}")
except requests.exceptions.ConnectionError as conn_err:
print(f"连接错误: {conn_err}")
except requests.exceptions.Timeout as timeout_err:
print(f"超时错误: {timeout_err}")
except requests.exceptions.RequestException as req_err:
print(f"请求错误: {req_err}")
return None
# 测试不同状态码
test_urls = [
'https://httpbin.org/status/200',
'https://httpbin.org/status/301',
'https://httpbin.org/status/404',
'https://httpbin.org/status/500'
]
print("=== 状态码测试 ===")
for url in test_urls:
print(f"\n请求: {url}")
response = requests.get(url, allow_redirects=False)
check_response(response)
# 使用安全请求函数
print(f"\n=== 安全请求示例 ===")
response = safe_request('https://httpbin.org/json')
if response:
print(f"请求成功,状态码: {response.status_code}")
response.text返回响应内容的字符串表示,适用于HTML、XML、纯文本等。
import requests
# 获取文本响应
response = requests.get('https://httpbin.org/html')
print(f"内容类型: {response.headers.get('content-type')}")
print(f"编码: {response.encoding}")
# 获取文本内容
text_content = response.text
print(f"\n响应文本长度: {len(text_content)} 字符")
print(f"前200个字符:")
print(text_content[:200] + "...")
# 检查内容类型
content_type = response.headers.get('content-type', '').lower()
if 'html' in content_type:
print("\n这是HTML内容")
# 可以进一步用BeautifulSoup解析
elif 'xml' in content_type:
print("\n这是XML内容")
# 可以进一步用xml.etree解析
elif 'plain' in content_type:
print("\n这是纯文本内容")
else:
print(f"\n内容类型: {content_type}")
import requests
import re
response = requests.get('https://httpbin.org/html')
text = response.text
# 1. 基本文本操作
print(f"总字符数: {len(text)}")
print(f"行数: {len(text.splitlines())}")
print(f"单词数: {len(text.split())}")
# 2. 搜索文本
if 'Herman Melville' in text:
print("找到 'Herman Melville'")
# 3. 使用正则表达式提取信息
# 提取所有标题
titles = re.findall(r'(.*?)
', text, re.IGNORECASE | re.DOTALL)
if titles:
print(f"找到 {len(titles)} 个标题:")
for title in titles:
print(f" - {title.strip()}")
# 4. 替换内容
cleaned_text = re.sub(r'<.*?>', '', text) # 移除HTML标签
cleaned_text = re.sub(r'\s+', ' ', cleaned_text) # 合并空白字符
cleaned_text = cleaned_text.strip()
print(f"\n清理后文本前100字符:")
print(cleaned_text[:100] + "...")
# 5. 文本分析和统计
words = cleaned_text.split()
word_count = {}
for word in words:
word = word.lower()
if len(word) > 3: # 只统计长度大于3的单词
word_count[word] = word_count.get(word, 0) + 1
# 按频率排序
sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True)
print(f"\n最常出现的单词:")
for word, count in sorted_words[:10]:
print(f" {word}: {count} 次")
import requests
from io import StringIO
def process_large_text_stream(url, chunk_size=1024):
"""流式处理大型文本内容"""
response = requests.get(url, stream=True)
# 创建字符串缓冲区
buffer = StringIO()
total_size = 0
line_count = 0
# 逐块读取
for chunk in response.iter_content(chunk_size=chunk_size, decode_unicode=True):
if chunk:
buffer.write(chunk)
total_size += len(chunk)
# 处理块中的行
lines = chunk.split('\n')
line_count += len(lines)
# 获取完整文本
full_text = buffer.getvalue()
buffer.close()
print(f"处理完成:")
print(f" 总大小: {total_size} 字符")
print(f" 总行数: {line_count}")
print(f" 内容类型: {response.headers.get('content-type')}")
return full_text
def process_line_by_line(url):
"""逐行处理文本(适用于文本文件)"""
response = requests.get(url, stream=True)
lines_processed = 0
interesting_lines = []
# 逐行读取(适用于文本响应)
for line in response.iter_lines(decode_unicode=True):
if line:
lines_processed += 1
# 示例:查找包含特定关键词的行
if 'error' in line.lower() or 'warning' in line.lower():
interesting_lines.append(line[:100]) # 只保存前100字符
# 每处理1000行打印一次进度
if lines_processed % 1000 == 0:
print(f"已处理 {lines_processed} 行...")
print(f"\n处理完成:")
print(f" 总行数: {lines_processed}")
print(f" 发现 {len(interesting_lines)} 个有趣的行")
if interesting_lines:
print("\n前5个有趣的行:")
for i, line in enumerate(interesting_lines[:5]):
print(f" {i+1}. {line}")
return lines_processed
# 使用示例
print("=== 大型文本处理示例 ===")
# 示例1:流式处理
print("\n1. 流式处理大型文本:")
try:
text = process_large_text_stream('https://httpbin.org/html')
print(f" 获取到文本,长度: {len(text)}")
except Exception as e:
print(f" 错误: {e}")
# 示例2:逐行处理(假设是文本文件)
print("\n2. 逐行处理示例:")
# 注意:httpbin没有纯文本大文件端点,这里只是演示代码结构
# lines = process_line_by_line('https://example.com/large-log-file.txt')
# 示例3:分块处理并保存
def save_text_in_chunks(url, output_file, chunk_size=8192):
"""分块下载并保存文本文件"""
response = requests.get(url, stream=True)
with open(output_file, 'w', encoding='utf-8') as f:
total_written = 0
for chunk in response.iter_content(chunk_size=chunk_size, decode_unicode=True):
if chunk:
f.write(chunk)
total_written += len(chunk)
# 显示进度
if total_written % (chunk_size * 10) == 0:
print(f" 已写入 {total_written} 字符...")
print(f"文件保存完成: {output_file}")
print(f"总大小: {total_written} 字符")
print("\n3. 分块保存示例:")
# save_text_in_chunks('https://example.com/large-file.txt', 'downloaded.txt')
response.json()方法将JSON响应解析为Python对象(字典或列表)。
import requests
import json
# 获取JSON响应
response = requests.get('https://httpbin.org/json')
print(f"内容类型: {response.headers.get('content-type')}")
# 方法1:使用response.json()自动解析
try:
data = response.json()
print(f"✓ 成功解析JSON")
print(f"数据类型: {type(data)}")
# 访问JSON数据
print(f"\nJSON数据结构:")
if isinstance(data, dict):
print(f" 根对象是字典,有 {len(data)} 个键")
for key in data.keys():
print(f" - {key}")
# 提取嵌套数据
if 'slideshow' in data:
slideshow = data['slideshow']
print(f"\n幻灯片信息:")
print(f" 作者: {slideshow.get('author')}")
print(f" 日期: {slideshow.get('date')}")
print(f" 幻灯片数量: {len(slideshow.get('slides', []))}")
# 遍历幻灯片
for i, slide in enumerate(slideshow.get('slides', []), 1):
print(f" 幻灯片 {i}: {slide.get('title')}")
except ValueError as e:
print(f"✗ JSON解析错误: {e}")
except Exception as e:
print(f"✗ 其他错误: {e}")
# 方法2:手动解析(当需要更多控制时)
print(f"\n=== 手动JSON解析 ===")
try:
# 获取原始文本
json_text = response.text
# 使用json模块解析
data_manual = json.loads(json_text)
# 使用自定义参数
data_custom = json.loads(
json_text,
parse_float=float, # 浮点数解析函数
parse_int=int, # 整数解析函数
parse_constant=None, # 常量解析函数
object_hook=None, # 对象钩子函数
object_pairs_hook=None # 键值对钩子函数
)
print(f"手动解析成功,数据类型: {type(data_manual)}")
except json.JSONDecodeError as e:
print(f"JSON解码错误: {e}")
print(f"错误位置: 行{e.lineno}, 列{e.colno}")
print(f"错误片段: {e.doc[e.pos-50:e.pos+50]}")
import requests
import json
from datetime import datetime
from decimal import Decimal
# 自定义JSON解码器
class CustomJSONDecoder(json.JSONDecoder):
"""自定义JSON解码器"""
def __init__(self, *args, **kwargs):
# 添加对象钩子
kwargs['object_hook'] = self.object_hook
super().__init__(*args, **kwargs)
def object_hook(self, obj):
"""对象钩子,用于自定义对象解析"""
# 处理日期字符串
if 'date' in obj and isinstance(obj['date'], str):
try:
obj['date'] = datetime.fromisoformat(obj['date'].replace('Z', '+00:00'))
except (ValueError, AttributeError):
pass
# 处理金额字段(字符串转Decimal)
if 'amount' in obj and isinstance(obj['amount'], (str, int, float)):
try:
obj['amount'] = Decimal(str(obj['amount']))
except:
pass
# 添加处理时间戳
if 'timestamp' in obj and isinstance(obj['timestamp'], (int, float)):
try:
obj['timestamp'] = datetime.fromtimestamp(obj['timestamp'])
except (ValueError, OSError):
pass
return obj
# 获取JSON数据
response = requests.get('https://httpbin.org/json')
try:
# 使用自定义解码器
custom_data = json.loads(response.text, cls=CustomJSONDecoder)
print("使用自定义解码器解析的JSON:")
print(json.dumps(custom_data, indent=2, default=str)[:500] + "...")
except json.JSONDecodeError as e:
print(f"JSON解码错误: {e}")
# 处理复杂JSON结构
def extract_json_path(data, path):
"""从JSON数据中提取指定路径的值"""
keys = path.split('.')
current = data
for key in keys:
if isinstance(current, dict) and key in current:
current = current[key]
elif isinstance(current, list):
try:
index = int(key)
if 0 <= index < len(current):
current = current[index]
else:
return None
except ValueError:
return None
else:
return None
return current
# 示例:从复杂JSON中提取数据
complex_json = {
"users": [
{
"id": 1,
"name": "张三",
"profile": {
"age": 25,
"email": "zhangsan@example.com",
"address": {
"city": "北京",
"country": "中国"
}
}
},
{
"id": 2,
"name": "李四",
"profile": {
"age": 30,
"email": "lisi@example.com",
"address": {
"city": "上海",
"country": "中国"
}
}
}
]
}
print(f"\n=== JSON路径提取 ===")
paths = [
"users.0.name",
"users.1.profile.address.city",
"users.0.profile.email",
"users.2.name" # 不存在的路径
]
for path in paths:
value = extract_json_path(complex_json, path)
print(f"{path}: {value}")
# JSON验证和格式化
def validate_and_format_json(json_text):
"""验证和格式化JSON"""
try:
# 解析JSON
data = json.loads(json_text)
# 格式化输出
formatted = json.dumps(
data,
indent=2,
ensure_ascii=False, # 正确显示中文
sort_keys=True, # 按键排序
default=str # 处理无法序列化的对象
)
return formatted
except json.JSONDecodeError as e:
return f"JSON验证失败: {e}"
# 测试
test_json = '{"name": "张三", "age": 25, "active": true, "hobbies": ["编程", "读书"]}'
print(f"\n=== JSON验证和格式化 ===")
print(validate_and_format_json(test_json))
import requests
import json
from typing import Dict, Any, List, Optional
class JSONAPIHandler:
"""JSON API响应处理器"""
@staticmethod
def handle_api_response(response: requests.Response) -> Dict[str, Any]:
"""处理API响应,返回标准化结果"""
result = {
'success': False,
'status_code': response.status_code,
'data': None,
'error': None,
'headers': dict(response.headers),
'url': response.url,
'elapsed': response.elapsed.total_seconds()
}
# 检查状态码
if 200 <= response.status_code < 300:
try:
# 尝试解析JSON
data = response.json()
result['success'] = True
result['data'] = data
# 检查常见的API响应格式
if isinstance(data, dict):
# 检查是否有错误字段
if 'error' in data:
result['success'] = False
result['error'] = data['error']
elif 'message' in data and not result['success']:
result['error'] = data['message']
except ValueError as e:
# JSON解析失败
result['error'] = f"JSON解析失败: {e}"
result['data'] = response.text[:500] # 保存部分原始文本
except Exception as e:
result['error'] = f"处理响应时出错: {e}"
else:
# 非2xx状态码
result['error'] = f"HTTP错误 {response.status_code}: {response.reason}"
# 尝试获取错误详情
try:
error_data = response.json()
if isinstance(error_data, dict) and 'error' in error_data:
result['error'] = error_data['error']
elif isinstance(error_data, dict) and 'message' in error_data:
result['error'] = error_data['message']
except:
result['data'] = response.text[:500]
return result
@staticmethod
def extract_pagination_info(data: Dict) -> Dict:
"""从API响应中提取分页信息"""
pagination = {
'page': 1,
'per_page': 20,
'total': 0,
'total_pages': 0,
'has_next': False,
'has_prev': False
}
# 常见的分页字段名
pagination_fields = {
'page': ['page', 'current_page', 'page_number'],
'per_page': ['per_page', 'limit', 'page_size'],
'total': ['total', 'total_count', 'total_items'],
'total_pages': ['total_pages', 'pages'],
'next_page': ['next_page', 'next'],
'prev_page': ['prev_page', 'previous']
}
if isinstance(data, dict):
for field, possible_names in pagination_fields.items():
for name in possible_names:
if name in data:
pagination[field] = data[name]
break
# 计算是否有下一页/上一页
if 'total' in data and 'per_page' in data:
total = data['total']
per_page = data['per_page']
current_page = data.get('page', 1)
pagination['total_pages'] = (total + per_page - 1) // per_page
pagination['has_next'] = current_page < pagination['total_pages']
pagination['has_prev'] = current_page > 1
return pagination
@staticmethod
def flatten_json(data: Dict, parent_key: str = '', sep: str = '.') -> Dict:
"""扁平化嵌套的JSON结构"""
items = []
for key, value in data.items():
new_key = f"{parent_key}{sep}{key}" if parent_key else key
if isinstance(value, dict):
items.extend(JSONAPIHandler.flatten_json(value, new_key, sep).items())
elif isinstance(value, list):
# 处理列表,转换为带索引的键
for i, item in enumerate(value):
if isinstance(item, dict):
items.extend(
JSONAPIHandler.flatten_json(item, f"{new_key}[{i}]", sep).items()
)
else:
items.append((f"{new_key}[{i}]", item))
else:
items.append((new_key, value))
return dict(items)
# 使用示例
print("=== JSON API处理器示例 ===")
# 测试API响应处理
test_responses = [
requests.get('https://httpbin.org/json'), # 成功响应
requests.get('https://httpbin.org/status/404'), # 404错误
requests.get('https://httpbin.org/status/500'), # 500错误
]
for i, response in enumerate(test_responses):
print(f"\n测试 {i+1}: {response.url}")
result = JSONAPIHandler.handle_api_response(response)
print(f" 成功: {result['success']}")
print(f" 状态码: {result['status_code']}")
print(f" 错误: {result['error']}")
if result['data'] and isinstance(result['data'], dict):
print(f" 数据键: {list(result['data'].keys())[:5]}...")
# 测试分页信息提取
pagination_data = {
"page": 2,
"per_page": 10,
"total": 45,
"data": [{"id": i, "name": f"Item {i}"} for i in range(11, 21)]
}
print(f"\n=== 分页信息提取 ===")
pagination_info = JSONAPIHandler.extract_pagination_info(pagination_data)
print(json.dumps(pagination_info, indent=2))
# 测试JSON扁平化
nested_json = {
"user": {
"id": 1,
"name": "张三",
"profile": {
"age": 25,
"emails": ["zhangsan@example.com", "zs@work.com"]
}
},
"orders": [
{"id": 101, "total": 99.99},
{"id": 102, "total": 49.99}
]
}
print(f"\n=== JSON扁平化 ===")
flattened = JSONAPIHandler.flatten_json(nested_json)
for key, value in flattened.items():
print(f" {key}: {value}")
response.content返回响应内容的字节表示,适用于图片、PDF、音视频等二进制文件。
import requests
# 获取二进制内容(如图片)
image_url = "https://httpbin.org/image/jpeg"
response = requests.get(image_url)
print(f"内容类型: {response.headers.get('content-type')}")
print(f"内容长度: {len(response.content)} 字节")
# 检查是否为二进制内容
content_type = response.headers.get('content-type', '').lower()
if 'image' in content_type:
print("这是图片文件")
# 获取二进制数据
image_data = response.content
print(f"图片大小: {len(image_data)} 字节")
print(f"前100字节: {image_data[:100]}")
# 检查图片格式
if image_data[:3] == b'\xff\xd8\xff':
print("这是JPEG格式图片")
elif image_data[:8] == b'\x89PNG\r\n\x1a\n':
print("这是PNG格式图片")
elif image_data[:6] == b'GIF87a' or image_data[:6] == b'GIF89a':
print("这是GIF格式图片")
elif 'application/pdf' in content_type:
print("这是PDF文件")
elif 'application/zip' in content_type:
print("这是ZIP压缩文件")
elif 'video' in content_type:
print("这是视频文件")
elif 'audio' in content_type:
print("这是音频文件")
else:
print(f"未知的二进制格式: {content_type}")
import requests
import os
from pathlib import Path
def save_binary_file(url, filename=None, directory='downloads'):
"""下载并保存二进制文件"""
# 创建保存目录
os.makedirs(directory, exist_ok=True)
# 发送请求
response = requests.get(url, stream=True)
# 如果没有指定文件名,从URL或Content-Disposition头获取
if not filename:
# 从URL获取文件名
filename = url.split('/')[-1]
# 如果文件名不合法,使用默认名
if not filename or '.' not in filename:
content_type = response.headers.get('content-type', '')
extension = '.bin' # 默认扩展名
if 'jpeg' in content_type or 'jpg' in content_type:
extension = '.jpg'
elif 'png' in content_type:
extension = '.png'
elif 'pdf' in content_type:
extension = '.pdf'
elif 'zip' in content_type:
extension = '.zip'
filename = f'downloaded_file{extension}'
# 完整文件路径
filepath = os.path.join(directory, filename)
# 保存文件
with open(filepath, 'wb') as f:
# 对于大文件,使用分块写入
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"文件已保存: {filepath}")
print(f"文件大小: {os.path.getsize(filepath)} 字节")
return filepath
# 使用示例
print("=== 二进制文件下载示例 ===")
# 测试下载不同类型的文件
test_files = [
'https://httpbin.org/image/jpeg',
'https://httpbin.org/image/png',
'https://httpbin.org/robots.txt' # 文本文件,但可以作为二进制下载
]
for url in test_files:
print(f"\n下载: {url}")
try:
saved_path = save_binary_file(url)
print(f" 保存到: {saved_path}")
except Exception as e:
print(f" 下载失败: {e}")
# 批量下载
def download_files(url_list, directory='downloads'):
"""批量下载文件"""
results = []
for i, url in enumerate(url_list, 1):
print(f"下载 {i}/{len(url_list)}: {url}")
try:
filepath = save_binary_file(url, directory=directory)
results.append({
'url': url,
'success': True,
'filepath': filepath,
'error': None
})
except Exception as e:
results.append({
'url': url,
'success': False,
'filepath': None,
'error': str(e)
})
# 统计结果
success_count = sum(1 for r in results if r['success'])
print(f"\n下载完成: {success_count}/{len(url_list)} 成功")
return results
import requests
import io
from PIL import Image # 需要安装: pip install Pillow
# 在内存中处理二进制内容
def process_image_in_memory(image_url):
"""在内存中下载并处理图片"""
# 下载图片到内存
response = requests.get(image_url)
if response.status_code != 200:
print(f"下载失败: {response.status_code}")
return None
# 将二进制数据加载到内存文件对象
image_bytes = io.BytesIO(response.content)
try:
# 使用PIL打开图片(不需要保存到磁盘)
img = Image.open(image_bytes)
print(f"图片信息:")
print(f" 格式: {img.format}")
print(f" 模式: {img.mode}")
print(f" 尺寸: {img.size}")
print(f" 宽度: {img.width} 像素")
print(f" 高度: {img.height} 像素")
# 可以在内存中处理图片
# 例如:创建缩略图
thumbnail_size = (100, 100)
img.thumbnail(thumbnail_size)
# 保存缩略图到内存
thumbnail_bytes = io.BytesIO()
img.save(thumbnail_bytes, format=img.format or 'JPEG')
print(f"缩略图大小: {len(thumbnail_bytes.getvalue())} 字节")
return {
'original': response.content,
'thumbnail': thumbnail_bytes.getvalue(),
'info': {
'format': img.format,
'size': img.size,
'mode': img.mode
}
}
except Exception as e:
print(f"图片处理错误: {e}")
return None
def process_pdf_in_memory(pdf_url):
"""在内存中处理PDF文件(示例)"""
response = requests.get(pdf_url)
if response.status_code != 200:
return None
# 创建内存中的PDF文件对象
pdf_bytes = io.BytesIO(response.content)
print(f"PDF文件大小: {len(response.content)} 字节")
# 这里可以添加PDF处理逻辑
# 例如使用PyPDF2或pdfminer处理PDF内容
return pdf_bytes
def extract_file_metadata(content, content_type):
"""从二进制内容中提取元数据"""
metadata = {
'size': len(content),
'type': content_type,
'is_binary': True
}
# 检查文件签名(魔术数字)
signatures = {
b'\xff\xd8\xff': 'JPEG',
b'\x89PNG\r\n\x1a\n': 'PNG',
b'GIF87a': 'GIF87a',
b'GIF89a': 'GIF89a',
b'%PDF': 'PDF',
b'PK\x03\x04': 'ZIP',
b'\x1f\x8b\x08': 'GZIP',
b'\x42\x4d': 'BMP',
b'\x49\x49\x2a\x00': 'TIFF (little-endian)',
b'\x4d\x4d\x00\x2a': 'TIFF (big-endian)'
}
for signature, file_type in signatures.items():
if content[:len(signature)] == signature:
metadata['detected_type'] = file_type
break
return metadata
# 使用示例
print("=== 二进制内容内存处理 ===")
# 处理图片
image_info = process_image_in_memory('https://httpbin.org/image/jpeg')
if image_info:
print(f"图片处理完成,缩略图大小: {len(image_info['thumbnail'])} 字节")
# 提取文件元数据
print(f"\n=== 文件元数据提取 ===")
test_urls = [
('https://httpbin.org/image/jpeg', 'image/jpeg'),
('https://httpbin.org/robots.txt', 'text/plain'),
]
for url, content_type in test_urls:
response = requests.get(url)
metadata = extract_file_metadata(response.content, content_type)
print(f"\nURL: {url}")
for key, value in metadata.items():
print(f" {key}: {value}")
# 二进制数据验证
def verify_binary_integrity(content, expected_size=None, checksum=None):
"""验证二进制数据的完整性"""
result = {
'size_ok': True,
'checksum_ok': True,
'errors': []
}
# 检查大小
if expected_size and len(content) != expected_size:
result['size_ok'] = False
result['errors'].append(
f"大小不匹配: 期望{expected_size}, 实际{len(content)}"
)
# 计算校验和(简单示例)
if checksum:
# 这里可以使用更复杂的校验算法(如MD5, SHA256)
simple_checksum = sum(content) % 256
if simple_checksum != checksum:
result['checksum_ok'] = False
result['errors'].append(
f"校验和不匹配: 期望{checksum}, 实际{simple_checksum}"
)
return result
# 测试完整性验证
print(f"\n=== 二进制数据完整性验证 ===")
test_content = b'Hello, World!'
verification = verify_binary_integrity(
test_content,
expected_size=13,
checksum=sum(test_content) % 256
)
print(f"验证结果:")
print(f" 大小正确: {verification['size_ok']}")
print(f" 校验和正确: {verification['checksum_ok']}")
if verification['errors']:
print(f" 错误: {verification['errors']}")
正确的编码处理对于文本内容至关重要,特别是处理中文等非ASCII字符时。
import requests
import chardet # 需要安装: pip install chardet
# 获取响应
response = requests.get('https://httpbin.org/encoding/utf8')
print("=== 编码信息 ===")
print(f"响应头中的编码: {response.encoding}")
print(f"响应头Content-Type: {response.headers.get('content-type')}")
# Requests自动检测编码的规则:
# 1. 首先检查HTTP头部
# 2. 如果头部没有指定,使用chardet检测
# 3. 如果检测失败,使用ISO-8859-1
# 手动检测编码
if response.encoding is None:
detected = chardet.detect(response.content)
print(f"自动检测的编码: {detected['encoding']}")
print(f"检测置信度: {detected['confidence']}")
# 设置检测到的编码
response.encoding = detected['encoding']
# 查看文本内容
text = response.text
print(f"\n文本内容前100字符:")
print(text[:100])
# 处理编码问题
def fix_encoding_issues(text):
"""修复常见的编码问题"""
# 常见错误编码映射
common_fixes = {
'é': 'é',
'è': 'è',
'ê': 'ê',
'Ã': 'à',
'Â': '',
# 添加更多映射...
}
for wrong, right in common_fixes.items():
text = text.replace(wrong, right)
return text
# 测试不同编码
test_urls = [
('https://httpbin.org/encoding/utf8', 'UTF-8'),
# 注意:httpbin没有其他编码的测试端点
# 这里只是演示代码结构
]
for url, expected_encoding in test_urls:
print(f"\n测试URL: {url}")
response = requests.get(url)
print(f" 预期编码: {expected_encoding}")
print(f" 实际编码: {response.encoding}")
if response.encoding and response.encoding.upper() != expected_encoding.upper():
print(f" ⚠ 编码不匹配!")
# 可以尝试重新编码
try:
# 先按检测到的编码解码,再按正确编码编码
corrected_text = response.content.decode(response.encoding).encode(expected_encoding).decode(expected_encoding)
print(f" 纠正后文本前50字符: {corrected_text[:50]}")
except Exception as e:
print(f" 编码纠正失败: {e}")
import requests
import chardet
from typing import Optional
class EncodingHandler:
"""编码处理工具类"""
@staticmethod
def detect_encoding(content: bytes) -> Optional[str]:
"""检测字节内容的编码"""
try:
result = chardet.detect(content)
if result['confidence'] > 0.7: # 置信度阈值
return result['encoding']
except:
pass
# 尝试常见编码
common_encodings = ['utf-8', 'gbk', 'gb2312', 'big5', 'shift_jis', 'euc-jp']
for encoding in common_encodings:
try:
content.decode(encoding)
return encoding
except UnicodeDecodeError:
continue
return None
@staticmethod
def safe_decode(content: bytes, encoding: Optional[str] = None) -> str:
"""安全解码字节内容"""
if encoding:
try:
return content.decode(encoding)
except UnicodeDecodeError:
pass
# 自动检测
detected = EncodingHandler.detect_encoding(content)
if detected:
try:
return content.decode(detected)
except UnicodeDecodeError:
pass
# 最后尝试
try:
return content.decode('utf-8', errors='replace')
except:
return content.decode('latin-1', errors='replace')
@staticmethod
def normalize_encoding(encoding: str) -> str:
"""标准化编码名称"""
encoding = encoding.lower()
# 常见编码别名映射
aliases = {
'utf8': 'utf-8',
'utf-8-sig': 'utf-8',
'gb2312': 'gbk',
'cp936': 'gbk',
'big5-hkscs': 'big5',
'shift_jis': 'shift-jis',
'euc_jp': 'euc-jp'
}
return aliases.get(encoding, encoding)
@staticmethod
def fix_mojibake(text: str) -> str:
"""修复乱码"""
# 常见乱码模式
mojibake_patterns = [
# UTF-8被错误解释为Windows-1252
('é', 'é'),
('è', 'è'),
('ê', 'ê'),
('ë', 'ë'),
('á', 'á'),
('Ã ', 'à'),
('ñ', 'ñ'),
('ó', 'ó'),
('ö', 'ö'),
('ú', 'ú'),
('ü', 'ü'),
('ç', 'ç'),
('ÃŽ', 'Î'),
('Ã¥', 'å'),
('æ', 'æ'),
('ø', 'ø'),
('ÿ', 'ÿ'),
# GBK被错误解释为UTF-8
('绿', '绿'),
('é»', '红'),
('è', '蓝'),
]
for wrong, right in mojibake_patterns:
text = text.replace(wrong, right)
return text
# 使用示例
print("=== 编码处理工具示例 ===")
# 测试不同编码的文本
test_cases = [
(b'Hello, World!', 'ascii'),
(b'\xe4\xbd\xa0\xe5\xa5\xbd', 'utf-8'), # 你好
(b'\xc4\xe3\xba\xc3', 'gbk'), # 你好
]
for content, expected_encoding in test_cases:
print(f"\n测试内容: {content}")
print(f"期望编码: {expected_encoding}")
# 检测编码
detected = EncodingHandler.detect_encoding(content)
print(f"检测到的编码: {detected}")
# 安全解码
decoded = EncodingHandler.safe_decode(content)
print(f"解码结果: {decoded}")
# 标准化编码
if detected:
normalized = EncodingHandler.normalize_encoding(detected)
print(f"标准化编码: {normalized}")
# 测试乱码修复
print(f"\n=== 乱码修复示例 ===")
# 模拟乱码(UTF-8被错误解释为Windows-1252)
mojibake_text = "Café" # 应该是 "Café"
print(f"乱码文本: {mojibake_text}")
fixed_text = EncodingHandler.fix_mojibake(mojibake_text)
print(f"修复后: {fixed_text}")
# 处理网页编码
def fetch_with_correct_encoding(url):
"""获取网页并使用正确的编码"""
response = requests.get(url)
# 如果响应头没有指定编码或编码不正确
if not response.encoding or response.encoding.lower() == 'iso-8859-1':
# 检测实际编码
actual_encoding = EncodingHandler.detect_encoding(response.content)
if actual_encoding:
response.encoding = actual_encoding
print(f"检测到实际编码: {actual_encoding}")
else:
# 常见网页编码
response.encoding = 'utf-8'
print(f"使用默认编码: utf-8")
return response.text
# 模拟获取网页
print(f"\n=== 网页编码处理 ===")
# 注意:这里只是演示,实际使用时需要真实的URL
# html = fetch_with_correct_encoding('https://example.com')
将响应内容保存到文件是常见的需求,特别是下载图片、文档等资源时。
import requests
import os
from pathlib import Path
def save_response_to_file(response, filename, directory='downloads'):
"""保存响应内容到文件"""
# 创建目录
os.makedirs(directory, exist_ok=True)
# 完整文件路径
filepath = os.path.join(directory, filename)
# 根据内容类型决定保存方式
content_type = response.headers.get('content-type', '').lower()
if 'text' in content_type:
# 文本文件
with open(filepath, 'w', encoding='utf-8') as f:
f.write(response.text)
print(f"文本文件已保存: {filepath}")
else:
# 二进制文件
with open(filepath, 'wb') as f:
f.write(response.content)
print(f"二进制文件已保存: {filepath}")
# 验证文件
if os.path.exists(filepath):
file_size = os.path.getsize(filepath)
print(f"文件大小: {file_size} 字节")
return filepath
else:
print(f"文件保存失败")
return None
# 使用示例
print("=== 基本文件保存示例 ===")
# 保存文本文件
response = requests.get('https://httpbin.org/html')
save_response_to_file(response, 'example.html')
# 保存图片
response = requests.get('https://httpbin.org/image/png')
save_response_to_file(response, 'example.png')
# 保存JSON
response = requests.get('https://httpbin.org/json')
save_response_to_file(response, 'data.json')
# 从URL推断文件名
def get_filename_from_response(response, default='download'):
"""从响应中获取合适的文件名"""
# 从Content-Disposition头获取文件名
content_disposition = response.headers.get('content-disposition', '')
if 'filename=' in content_disposition:
# 提取文件名
import re
match = re.search(r'filename="?([^"]+)"?', content_disposition)
if match:
return match.group(1)
# 从URL获取文件名
url = response.url
filename = url.split('/')[-1]
# 清理文件名
if '?' in filename:
filename = filename.split('?')[0]
# 如果没有扩展名,根据内容类型添加
if '.' not in filename:
content_type = response.headers.get('content-type', '')
extensions = {
'text/html': '.html',
'application/json': '.json',
'image/jpeg': '.jpg',
'image/png': '.png',
'image/gif': '.gif',
'application/pdf': '.pdf',
'application/zip': '.zip',
'text/plain': '.txt'
}
for ct, ext in extensions.items():
if ct in content_type:
filename += ext
break
return filename or default
# 测试文件名获取
print(f"\n=== 文件名推断 ===")
test_responses = [
requests.get('https://httpbin.org/html'),
requests.get('https://httpbin.org/image/png'),
requests.get('https://httpbin.org/json'),
]
for response in test_responses:
filename = get_filename_from_response(response)
print(f"URL: {response.url}")
print(f" 推断的文件名: {filename}")
print(f" 内容类型: {response.headers.get('content-type')}")
import requests
import os
import time
from pathlib import Path
from typing import Optional, Dict, Any
from urllib.parse import urlparse
class FileDownloader:
"""高级文件下载器"""
def __init__(self, download_dir='downloads'):
self.download_dir = download_dir
os.makedirs(download_dir, exist_ok=True)
# 下载统计
self.stats = {
'total_downloads': 0,
'successful_downloads': 0,
'failed_downloads': 0,
'total_bytes': 0
}
def download(self, url: str,
filename: Optional[str] = None,
overwrite: bool = False,
chunk_size: int = 8192,
timeout: int = 30) -> Dict[str, Any]:
"""下载文件"""
self.stats['total_downloads'] += 1
result = {
'url': url,
'success': False,
'filename': None,
'filepath': None,
'size': 0,
'time_taken': 0,
'error': None
}
start_time = time.time()
try:
# 发送请求
response = requests.get(url, stream=True, timeout=timeout)
response.raise_for_status()
# 获取文件名
if not filename:
filename = self._get_filename(response, url)
# 完整文件路径
filepath = os.path.join(self.download_dir, filename)
# 检查文件是否已存在
if os.path.exists(filepath) and not overwrite:
result['error'] = f"文件已存在: {filepath}"
result['filepath'] = filepath
return result
# 获取文件大小(如果服务器支持)
total_size = int(response.headers.get('content-length', 0))
# 下载文件
downloaded = 0
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# 显示进度(对于大文件)
if total_size > 0:
percent = (downloaded / total_size) * 100
if downloaded % (chunk_size * 100) == 0:
print(f" 进度: {downloaded}/{total_size} 字节 ({percent:.1f}%)")
# 验证下载
actual_size = os.path.getsize(filepath)
if total_size > 0 and actual_size != total_size:
result['error'] = f"文件大小不匹配: 期望{total_size}, 实际{actual_size}"
os.remove(filepath) # 删除不完整的文件
else:
result['success'] = True
result['filename'] = filename
result['filepath'] = filepath
result['size'] = actual_size
self.stats['successful_downloads'] += 1
self.stats['total_bytes'] += actual_size
except requests.exceptions.Timeout:
result['error'] = f"请求超时 ({timeout}秒)"
except requests.exceptions.HTTPError as e:
result['error'] = f"HTTP错误: {e}"
except requests.exceptions.ConnectionError:
result['error'] = "连接错误"
except requests.exceptions.RequestException as e:
result['error'] = f"请求错误: {e}"
except IOError as e:
result['error'] = f"文件IO错误: {e}"
except Exception as e:
result['error'] = f"未知错误: {e}"
finally:
end_time = time.time()
result['time_taken'] = end_time - start_time
if not result['success']:
self.stats['failed_downloads'] += 1
return result
def _get_filename(self, response: requests.Response, url: str) -> str:
"""获取合适的文件名"""
# 方法1: 从Content-Disposition头获取
filename = self._get_filename_from_headers(response)
if filename:
return filename
# 方法2: 从URL路径获取
parsed_url = urlparse(url)
path = parsed_url.path
if path and path != '/':
filename = os.path.basename(path)
if filename:
return filename
# 方法3: 根据内容类型生成
content_type = response.headers.get('content-type', '').lower()
extensions = {
'text/html': '.html',
'application/json': '.json',
'image/jpeg': '.jpg',
'image/png': '.png',
'image/gif': '.gif',
'application/pdf': '.pdf',
'application/zip': '.zip',
'text/plain': '.txt',
'application/xml': '.xml',
'text/css': '.css',
'application/javascript': '.js'
}
ext = '.bin'
for ct, extension in extensions.items():
if ct in content_type:
ext = extension
break
# 生成基于时间的文件名
timestamp = int(time.time())
return f'download_{timestamp}{ext}'
def _get_filename_from_headers(self, response: requests.Response) -> Optional[str]:
"""从响应头获取文件名"""
content_disposition = response.headers.get('content-disposition', '')
if content_disposition:
# 查找filename=后面的内容
import re
# 处理 filename="file.txt" 或 filename=file.txt
patterns = [
r'filename="([^"]+)"',
r"filename='([^']+)'",
r'filename=([^;]+)'
]
for pattern in patterns:
match = re.search(pattern, content_disposition)
if match:
filename = match.group(1).strip()
# 清理文件名(移除路径和特殊字符)
filename = os.path.basename(filename)
filename = re.sub(r'[^\w\-_.]', '_', filename)
return filename
return None
def batch_download(self, urls, max_workers=3):
"""批量下载文件"""
results = []
# 简单实现(可以改进为多线程/多进程)
for i, url in enumerate(urls, 1):
print(f"下载 {i}/{len(urls)}: {url}")
result = self.download(url)
results.append(result)
if result['success']:
print(f" ✓ 成功: {result['filename']} ({result['size']} 字节)")
else:
print(f" ✗ 失败: {result['error']}")
return results
def get_stats(self):
"""获取下载统计"""
return self.stats.copy()
# 使用示例
print("=== 高级文件下载器示例 ===")
# 创建下载器
downloader = FileDownloader('my_downloads')
# 下载单个文件
print("\n1. 下载单个文件:")
result = downloader.download(
'https://httpbin.org/image/png',
filename='test_image.png',
overwrite=True
)
print(f" 成功: {result['success']}")
print(f" 文件: {result['filename']}")
print(f" 大小: {result['size']} 字节")
print(f" 耗时: {result['time_taken']:.2f} 秒")
# 批量下载
print("\n2. 批量下载测试:")
urls = [
'https://httpbin.org/html',
'https://httpbin.org/image/jpeg',
'https://httpbin.org/robots.txt',
# 添加更多URL...
]
results = downloader.batch_download(urls[:3]) # 只测试前3个
print("\n下载统计:")
stats = downloader.get_stats()
for key, value in stats.items():
print(f" {key}: {value}")
# 清理测试目录
import shutil
if os.path.exists('my_downloads'):
shutil.rmtree('my_downloads')
print("\n已清理测试目录")
import requests
import json
import csv
import io
from typing import Any, Dict, List, Optional
from xml.etree import ElementTree
class ResponseConverter:
"""响应内容转换器"""
@staticmethod
def to_json(response: requests.Response, **kwargs) -> Any:
"""转换为JSON"""
try:
return response.json(**kwargs)
except ValueError as e:
raise ValueError(f"JSON转换失败: {e}")
@staticmethod
def to_xml(response: requests.Response) -> ElementTree.Element:
"""转换为XML"""
try:
return ElementTree.fromstring(response.content)
except ElementTree.ParseError as e:
raise ValueError(f"XML转换失败: {e}")
@staticmethod
def to_csv(response: requests.Response,
delimiter: str = ',') -> List[Dict[str, str]]:
"""转换为CSV(列表字典)"""
try:
# 使用StringIO处理文本
text = response.text
csv_file = io.StringIO(text)
# 读取CSV
import csv as csv_module
reader = csv_module.DictReader(csv_file, delimiter=delimiter)
return list(reader)
except Exception as e:
raise ValueError(f"CSV转换失败: {e}")
@staticmethod
def to_text(response: requests.Response,
strip: bool = True) -> str:
"""转换为文本"""
text = response.text
if strip:
text = text.strip()
return text
@staticmethod
def to_lines(response: requests.Response) -> List[str]:
"""转换为行列表"""
text = response.text
lines = text.splitlines()
# 移除空行
lines = [line.strip() for line in lines if line.strip()]
return lines
@staticmethod
def auto_convert(response: requests.Response) -> Any:
"""根据内容类型自动转换"""
content_type = response.headers.get('content-type', '').lower()
if 'json' in content_type:
return ResponseConverter.to_json(response)
elif 'xml' in content_type:
return ResponseConverter.to_xml(response)
elif 'csv' in content_type or 'text/csv' in content_type:
return ResponseConverter.to_csv(response)
elif 'text/html' in content_type:
# 对于HTML,可以返回文本或使用BeautifulSoup解析
return ResponseConverter.to_text(response)
elif 'text/plain' in content_type:
return ResponseConverter.to_text(response)
else:
# 其他类型返回原始内容
return response.content
# 使用示例
print("=== 响应内容转换器示例 ===")
# 测试不同内容类型的转换
test_endpoints = [
('https://httpbin.org/json', 'application/json'),
('https://httpbin.org/xml', 'application/xml'),
# httpbin没有CSV端点,这里只是演示代码
# ('https://example.com/data.csv', 'text/csv'),
('https://httpbin.org/html', 'text/html'),
('https://httpbin.org/robots.txt', 'text/plain'),
]
for url, expected_type in test_endpoints:
print(f"\n测试: {url}")
print(f"期望类型: {expected_type}")
try:
response = requests.get(url)
# 自动转换
converted = ResponseConverter.auto_convert(response)
print(f"转换类型: {type(converted)}")
if isinstance(converted, dict):
print(f" JSON键: {list(converted.keys())[:5]}...")
elif isinstance(converted, ElementTree.Element):
print(f" XML根标签: {converted.tag}")
elif isinstance(converted, list):
print(f" 列表长度: {len(converted)}")
if converted and isinstance(converted[0], dict):
print(f" 第一行键: {list(converted[0].keys())}")
elif isinstance(converted, str):
print(f" 文本长度: {len(converted)} 字符")
print(f" 前50字符: {converted[:50]}...")
elif isinstance(converted, bytes):
print(f" 二进制长度: {len(converted)} 字节")
except Exception as e:
print(f" 错误: {e}")
import requests
import json
from typing import Dict, Any, List
from collections import Counter
import re
class ResponseAnalyzer:
"""响应内容分析器"""
@staticmethod
def analyze_text(response: requests.Response) -> Dict[str, Any]:
"""分析文本响应"""
text = response.text
# 基本统计
char_count = len(text)
line_count = len(text.splitlines())
word_count = len(text.split())
# 单词频率
words = re.findall(r'\b\w+\b', text.lower())
word_freq = Counter(words)
top_words = word_freq.most_common(10)
# 句子统计(简单实现)
sentences = re.split(r'[.!?]+', text)
sentence_count = len([s for s in sentences if s.strip()])
# 阅读时间估计(按平均阅读速度200词/分钟)
reading_time = word_count / 200 # 分钟
return {
'character_count': char_count,
'line_count': line_count,
'word_count': word_count,
'sentence_count': sentence_count,
'top_words': top_words,
'estimated_reading_time': f"{reading_time:.1f} 分钟"
}
@staticmethod
def analyze_json(response: requests.Response) -> Dict[str, Any]:
"""分析JSON响应"""
try:
data = response.json()
def analyze_structure(obj, depth=0):
"""递归分析JSON结构"""
if isinstance(obj, dict):
keys = list(obj.keys())
result = {
'type': 'object',
'key_count': len(keys),
'keys': keys,
'children': {}
}
for key, value in obj.items():
result['children'][key] = analyze_structure(value, depth+1)
return result
elif isinstance(obj, list):
if obj:
# 分析第一个元素作为示例
sample = analyze_structure(obj[0], depth+1)
return {
'type': 'array',
'length': len(obj),
'sample_type': sample['type'],
'sample': sample
}
else:
return {
'type': 'array',
'length': 0,
'sample_type': 'empty'
}
else:
return {
'type': type(obj).__name__,
'value': str(obj)[:100] # 截断长值
}
structure = analyze_structure(data)
return {
'data_type': type(data).__name__,
'structure': structure
}
except ValueError:
return {'error': '无效的JSON'}
@staticmethod
def analyze_html(response: requests.Response) -> Dict[str, Any]:
"""分析HTML响应(简单版本)"""
text = response.text
# 标签统计
tags = re.findall(r'?(\w+)', text)
tag_count = Counter(tags)
# 链接统计
links = re.findall(r'href="([^"]+)"', text)
internal_links = [l for l in links if l.startswith('#') or not l.startswith('http')]
external_links = [l for l in links if l.startswith('http')]
# 图片统计
images = re.findall(r'
]*src="([^"]+)"', text)
# 标题统计
headings = {}
for level in range(1, 7):
pattern = fr']*>(.*?) '
matches = re.findall(pattern, text, re.IGNORECASE | re.DOTALL)
headings[f'h{level}'] = len(matches)
return {
'tag_count': dict(tag_count.most_common(10)),
'total_tags': sum(tag_count.values()),
'unique_tags': len(tag_count),
'link_count': len(links),
'internal_links': len(internal_links),
'external_links': len(external_links),
'image_count': len(images),
'heading_count': headings
}
@staticmethod
def generate_report(response: requests.Response) -> Dict[str, Any]:
"""生成完整的响应分析报告"""
content_type = response.headers.get('content-type', '').lower()
report = {
'url': response.url,
'status_code': response.status_code,
'content_type': content_type,
'content_length': len(response.content),
'headers': dict(response.headers),
'analysis': {}
}
# 根据内容类型进行分析
if 'json' in content_type:
report['analysis']['json'] = ResponseAnalyzer.analyze_json(response)
elif 'html' in content_type:
report['analysis']['html'] = ResponseAnalyzer.analyze_html(response)
elif 'text' in content_type:
report['analysis']['text'] = ResponseAnalyzer.analyze_text(response)
# 通用分析
report['analysis']['general'] = {
'encoding': response.encoding,
'elapsed_time': str(response.elapsed),
'is_redirect': response.is_redirect,
'redirect_history': len(response.history)
}
return report
# 使用示例
print("=== 响应内容分析器示例 ===")
# 分析HTML响应
print("\n1. 分析HTML响应:")
html_response = requests.get('https://httpbin.org/html')
html_report = ResponseAnalyzer.generate_report(html_response)
print(f"URL: {html_report['url']}")
print(f"状态码: {html_report['status_code']}")
print(f"内容类型: {html_report['content_type']}")
if 'html' in html_report['analysis']:
html_analysis = html_report['analysis']['html']
print(f"总标签数: {html_analysis.get('total_tags', 0)}")
print(f"链接数: {html_analysis.get('link_count', 0)}")
print(f"图片数: {html_analysis.get('image_count', 0)}")
# 分析JSON响应
print("\n2. 分析JSON响应:")
json_response = requests.get('https://httpbin.org/json')
json_report = ResponseAnalyzer.generate_report(json_response)
print(f"URL: {json_report['url']}")
print(f"状态码: {json_report['status_code']}")
if 'json' in json_report['analysis']:
json_analysis = json_report['analysis']['json']
print(f"数据类型: {json_analysis.get('data_type', '未知')}")
# 分析文本响应
print("\n3. 分析文本响应:")
text_response = requests.get('https://httpbin.org/robots.txt')
text_report = ResponseAnalyzer.generate_report(text_response)
print(f"URL: {text_report['url']}")
print(f"状态码: {text_report['status_code']}")
if 'text' in text_report['analysis']:
text_analysis = text_report['analysis']['text']
print(f"字符数: {text_analysis.get('character_count', 0)}")
print(f"单词数: {text_analysis.get('word_count', 0)}")
print(f"行数: {text_analysis.get('line_count', 0)}")
本章详细介绍了Requests库中响应文本内容的处理方法:
response.text处理HTML、XML、纯文本response.json()解析JSON,处理复杂结构response.content处理图片、PDF等二进制数据最佳实践建议:
raise_for_status()处理错误