在默认情况下,Requests会在你访问 response.content 或 response.text 时一次性下载整个响应体并存储在内存中。这对于小文件或API响应是可行的,但对于大文件或实时数据流,这会消耗大量内存。
流式请求 通过设置 stream=True 参数,允许你以流的方式读取响应内容,按需处理数据块,而不是一次性加载所有内容。
流式请求可以处理任意大小的文件,因为内存中只保存一小部分数据。
无需等待整个文件下载完成,可以边下载边处理数据,提高响应速度。
可以处理实时数据流,如服务器推送事件(SSE)或WebSocket替代方案。
| 场景 | 非流式请求 | 流式请求 | 推荐方案 |
|---|---|---|---|
| 小文件下载 (<1MB) | ✓ 适合 | 不必要 | 非流式 |
| 大文件下载 (>100MB) | ✗ 内存溢出风险 | ✓ 适合 | 流式 |
| 实时数据流 | ✗ 无法处理 | ✓ 适合 | 流式 |
| 视频/音频流 | ✗ 不适合 | ✓ 适合 | 流式 |
| API响应 | ✓ 适合 | 不必要 | 非流式 |
使用流式请求非常简单,只需在请求方法中添加 stream=True 参数:
import requests
# 设置stream=True启用流式请求
response = requests.get('https://httpbin.org/stream/20', stream=True)
# 重要:在使用流式响应时,需要手动检查状态码
if response.status_code == 200:
# 使用iter_content()或iter_lines()方法迭代处理数据
for chunk in response.iter_content(chunk_size=1024):
# 处理每个数据块
if chunk: # 过滤掉keep-alive空块
print(f"收到 {len(chunk)} 字节数据")
# 在这里处理chunk数据
print("流式请求完成!")
else:
print(f"请求失败,状态码: {response.status_code}")
# 重要:在使用流式请求后,应该手动关闭响应
response.close()
stream=True 后,Requests不会自动将响应内容存储到内存response.iter_content() 或 response.iter_lines() 来读取数据response.close() 释放连接with 语句自动管理资源推荐使用 with 语句确保响应被正确关闭:
import requests
# 使用with语句自动管理连接
with requests.get('https://httpbin.org/stream/5', stream=True) as response:
# 检查响应状态
if response.status_code == 200:
for line in response.iter_lines():
if line: # 过滤空行
print(f"收到行数据: {line.decode('utf-8')}")
else:
print(f"请求失败: {response.status_code}")
# with块结束后,响应会自动关闭,无需手动调用response.close()
下载大文件是流式请求最常见的应用场景。下面的示例演示如何下载大文件并保存到本地:
import requests
def download_large_file(url, filepath, chunk_size=8192):
"""
下载大文件到本地
参数:
url: 文件URL
filepath: 本地保存路径
chunk_size: 每次读取的数据块大小(字节)
"""
# 发送流式请求
response = requests.get(url, stream=True)
# 检查请求是否成功
response.raise_for_status()
# 获取文件总大小(如果服务器提供了Content-Length)
total_size = int(response.headers.get('content-length', 0))
print(f"开始下载: {url}")
if total_size > 0:
print(f"文件大小: {total_size / (1024*1024):.2f} MB")
# 打开文件进行写入
with open(filepath, 'wb') as file:
downloaded_size = 0
# 迭代读取数据块并写入文件
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk: # 过滤掉keep-alive空块
file.write(chunk)
downloaded_size += len(chunk)
# 显示下载进度(如果知道总大小)
if total_size > 0:
percent = (downloaded_size / total_size) * 100
print(f"\r下载进度: {percent:.1f}% ({downloaded_size}/{total_size} 字节)", end='')
print(f"\n下载完成! 文件已保存到: {filepath}")
# 自动关闭响应(使用with语句时自动处理)
# 使用示例
if __name__ == "__main__":
# 示例:下载一个测试文件
url = "https://www.example.com/largefile.zip"
save_path = "largefile.zip"
try:
download_large_file(url, save_path)
except requests.exceptions.RequestException as e:
print(f"下载失败: {e}")
import requests
import sys
import time
def download_with_progress(url, filepath, chunk_size=8192):
"""
下载大文件并显示进度条
参数:
url: 文件URL
filepath: 本地保存路径
chunk_size: 每次读取的数据块大小(字节)
"""
# 发送流式请求
response = requests.get(url, stream=True)
response.raise_for_status()
# 获取文件总大小
total_size = int(response.headers.get('content-length', 0))
print(f"开始下载: {url}")
if total_size == 0:
print("警告: 无法获取文件大小,将不显示进度条")
# 打开文件进行写入
with open(filepath, 'wb') as file:
downloaded_size = 0
start_time = time.time()
# 迭代读取数据块
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
file.write(chunk)
downloaded_size += len(chunk)
# 显示进度条
if total_size > 0:
# 计算进度百分比
percent = (downloaded_size / total_size) * 100
# 计算下载速度
elapsed_time = time.time() - start_time
if elapsed_time > 0:
speed = downloaded_size / elapsed_time / 1024 # KB/s
else:
speed = 0
# 构建进度条
bar_length = 50
filled_length = int(bar_length * downloaded_size // total_size)
bar = '█' * filled_length + '░' * (bar_length - filled_length)
# 打印进度信息
sys.stdout.write(f'\r下载进度: |{bar}| {percent:.1f}% '
f'{downloaded_size/(1024*1024):.1f}MB/{total_size/(1024*1024):.1f}MB '
f'速度: {speed:.1f}KB/s')
sys.stdout.flush()
# 下载完成
elapsed_time = time.time() - start_time
print(f"\n\n下载完成! 耗时: {elapsed_time:.1f}秒")
if elapsed_time > 0:
avg_speed = downloaded_size / elapsed_time / 1024 # KB/s
print(f"平均速度: {avg_speed:.1f}KB/s")
print(f"文件已保存到: {filepath}")
# 使用示例
if __name__ == "__main__":
url = "https://speed.hetzner.de/100MB.bin" # 测试用的100MB文件
save_path = "testfile.bin"
try:
download_with_progress(url, save_path)
except requests.exceptions.RequestException as e:
print(f"下载失败: {e}")
import requests
import os
import sys
def resume_download(url, filepath, chunk_size=8192):
"""
支持断点续传的文件下载
参数:
url: 文件URL
filepath: 本地保存路径
chunk_size: 每次读取的数据块大小(字节)
"""
# 检查文件是否已部分下载
if os.path.exists(filepath):
# 获取已下载的文件大小
downloaded_size = os.path.getsize(filepath)
print(f"发现已下载部分: {downloaded_size} 字节")
# 设置Range头请求剩余部分
headers = {'Range': f'bytes={downloaded_size}-'}
else:
downloaded_size = 0
headers = {}
# 发送带Range头的流式请求
response = requests.get(url, headers=headers, stream=True)
# 检查服务器是否支持断点续传
if downloaded_size > 0 and response.status_code != 206:
print("服务器不支持断点续传,将重新下载")
downloaded_size = 0
response.close()
response = requests.get(url, stream=True)
response.raise_for_status()
# 获取文件总大小
content_range = response.headers.get('content-range')
if content_range:
total_size = int(content_range.split('/')[-1])
else:
total_size = int(response.headers.get('content-length', 0))
print(f"开始{'续传' if downloaded_size > 0 else '下载'}: {url}")
# 以追加模式打开文件
mode = 'ab' if downloaded_size > 0 else 'wb'
with open(filepath, mode) as file:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
file.write(chunk)
downloaded_size += len(chunk)
# 显示进度
if total_size > 0:
percent = (downloaded_size / total_size) * 100
sys.stdout.write(f'\r下载进度: {percent:.1f}% '
f'({downloaded_size/(1024*1024):.1f}MB/{total_size/(1024*1024):.1f}MB)')
sys.stdout.flush()
print(f"\n下载完成! 文件已保存到: {filepath}")
# 使用示例
if __name__ == "__main__":
url = "https://www.example.com/largefile.zip"
save_path = "largefile.zip"
try:
resume_download(url, save_path)
except requests.exceptions.RequestException as e:
print(f"下载失败: {e}")
Requests提供了多种方法来迭代处理流式数据:
按指定大小的数据块迭代,适用于二进制文件或自定义处理。
for chunk in response.iter_content(chunk_size=1024):
process_chunk(chunk)
按行迭代,适用于文本文件或API流式响应。
for line in response.iter_lines():
process_line(line.decode('utf-8'))
使用解码器迭代处理,自动处理编码。
for chunk in response.iter_content(decode_unicode=True):
process_decoded_chunk(chunk)
import requests
import json
def process_json_stream(url):
"""
处理JSON流式API响应
示例API返回每行一个JSON对象
"""
try:
response = requests.get(url, stream=True)
response.raise_for_status()
for line in response.iter_lines():
if line: # 过滤空行
try:
# 解析JSON数据
data = json.loads(line.decode('utf-8'))
# 处理数据
print(f"收到数据: {data}")
# 这里可以添加业务逻辑
# 例如:存储到数据库、实时分析等
except json.JSONDecodeError as e:
print(f"JSON解析错误: {e}")
continue
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
finally:
if 'response' in locals():
response.close()
# 使用示例
if __name__ == "__main__":
# 使用一个返回JSON流的测试API
test_url = "https://httpbin.org/stream/10"
process_json_stream(test_url)
许多现代API提供流式端点,用于实时数据传输。下面是一些常见场景:
处理实时API流时,连接可能保持很长时间。需要正确处理超时、重连和错误处理。
import requests
import re
def listen_to_sse(url):
"""
监听服务器推送事件(SSE)
SSE格式: data: {json}\n\n
"""
try:
# 设置长时间超时
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
print("开始监听SSE流...")
buffer = ""
for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
if chunk:
buffer += chunk
# 查找完整的事件(以\n\n分隔)
while "\n\n" in buffer:
event, buffer = buffer.split("\n\n", 1)
# 提取data字段
data_match = re.search(r'data:\s*(.+)$', event, re.MULTILINE)
if data_match:
data_str = data_match.group(1)
print(f"收到事件: {data_str}")
# 这里可以处理事件数据
# 例如:更新UI、触发回调等
except requests.exceptions.Timeout:
print("连接超时,尝试重新连接...")
except requests.exceptions.RequestException as e:
print(f"连接错误: {e}")
finally:
if 'response' in locals():
response.close()
# 使用示例(需要实际的SSE端点)
# if __name__ == "__main__":
# sse_url = "https://example.com/api/events"
# listen_to_sse(sse_url)
import requests
import json
import time
class StreamListener:
"""流式API监听器"""
def __init__(self, api_url, auth_token):
self.api_url = api_url
self.headers = {
'Authorization': f'Bearer {auth_token}',
'Content-Type': 'application/json'
}
self.running = False
def on_data(self, data):
"""处理收到的数据"""
print(f"收到数据: {data.get('text', 'No text')[:50]}...")
# 这里可以添加自定义处理逻辑
return True
def on_error(self, error):
"""处理错误"""
print(f"流错误: {error}")
return False
def start(self):
"""开始监听流"""
self.running = True
while self.running:
try:
# 设置流式请求
response = requests.post(
self.api_url,
headers=self.headers,
json={'track': 'python'}, # 跟踪关键词
stream=True,
timeout=90
)
print("连接成功,开始接收数据流...")
# 处理数据流
for line in response.iter_lines():
if not self.running:
break
if line:
try:
data = json.loads(line.decode('utf-8'))
# 调用数据处理方法
if not self.on_data(data):
break
except json.JSONDecodeError:
print(f"无法解析JSON: {line[:100]}")
continue
except requests.exceptions.Timeout:
print("连接超时,5秒后重试...")
time.sleep(5)
continue
except requests.exceptions.RequestException as e:
print(f"请求错误: {e}")
if not self.on_error(str(e)):
break
time.sleep(10) # 等待后重试
def stop(self):
"""停止监听"""
self.running = False
print("停止监听流...")
# 使用示例(需要实际的API凭证)
# if __name__ == "__main__":
# listener = StreamListener(
# api_url="https://api.twitter.com/2/tweets/search/stream",
# auth_token="YOUR_BEARER_TOKEN"
# )
#
# try:
# listener.start()
# except KeyboardInterrupt:
# listener.stop()
根据网络速度和内存限制调整chunk_size:
对于多个流式请求,使用Session对象复用连接:
session = requests.Session()
# 所有使用session的请求都会复用连接
为长时间运行的流设置适当的超时:
# 连接超时和读取超时
response = requests.get(url, stream=True,
timeout=(3.05, 60))
使用with语句或手动关闭连接:
# 方法1: with语句
with requests.get(url, stream=True) as r:
process_stream(r)
# 方法2: 手动关闭
response.close()
下载视频、ISO镜像、数据库备份等大文件,避免内存溢出。
监控系统指标、日志流、传感器数据等实时信息。
实时接收Twitter、Reddit等社交媒体的数据流。
构建ETL管道,从API流式接收数据并实时处理。
Requests的流式请求功能是处理大文件、实时数据流和内存敏感场景的强大工具。通过设置 stream=True 参数,你可以以增量方式处理HTTP响应,避免一次性加载大量数据到内存中。
关键要点:
stream=True 启用流式模式iter_content() 或 iter_lines() 迭代处理数据with 语句或手动调用 response.close() 释放连接