Requests 流式请求

流式请求 允许你以增量方式处理HTTP响应,而不是一次性将整个响应内容加载到内存中。这对于处理大文件下载、实时数据流或内存敏感的应用场景非常重要。

什么是流式请求?

在默认情况下,Requests会在你访问 response.contentresponse.text 时一次性下载整个响应体并存储在内存中。这对于小文件或API响应是可行的,但对于大文件或实时数据流,这会消耗大量内存。

流式请求 通过设置 stream=True 参数,允许你以流的方式读取响应内容,按需处理数据块,而不是一次性加载所有内容。

非流式请求
  • 一次性加载整个响应到内存
  • 内存使用量 = 文件大小
  • 需要等待全部下载完成
  • 可能因大文件导致内存溢出
流式请求
  • 按需读取数据块
  • 内存使用量 = 数据块大小
  • 可以边下载边处理
  • 支持无限大小的数据流

为什么使用流式请求?

内存效率

流式请求可以处理任意大小的文件,因为内存中只保存一小部分数据。

即时处理

无需等待整个文件下载完成,可以边下载边处理数据,提高响应速度。

无限流处理

可以处理实时数据流,如服务器推送事件(SSE)或WebSocket替代方案。

场景 非流式请求 流式请求 推荐方案
小文件下载 (<1MB) ✓ 适合 不必要 非流式
大文件下载 (>100MB) ✗ 内存溢出风险 ✓ 适合 流式
实时数据流 ✗ 无法处理 ✓ 适合 流式
视频/音频流 ✗ 不适合 ✓ 适合 流式
API响应 ✓ 适合 不必要 非流式

基本使用方法

使用流式请求非常简单,只需在请求方法中添加 stream=True 参数:

基本流式请求示例
import requests

# 设置stream=True启用流式请求
response = requests.get('https://httpbin.org/stream/20', stream=True)

# 重要:在使用流式响应时,需要手动检查状态码
if response.status_code == 200:
    # 使用iter_content()或iter_lines()方法迭代处理数据
    for chunk in response.iter_content(chunk_size=1024):
        # 处理每个数据块
        if chunk:  # 过滤掉keep-alive空块
            print(f"收到 {len(chunk)} 字节数据")
            # 在这里处理chunk数据

    print("流式请求完成!")
else:
    print(f"请求失败,状态码: {response.status_code}")

# 重要:在使用流式请求后,应该手动关闭响应
response.close()
重要提示:
  • 使用 stream=True 后,Requests不会自动将响应内容存储到内存
  • 必须使用 response.iter_content()response.iter_lines() 来读取数据
  • 在处理完流式响应后,应该调用 response.close() 释放连接
  • 也可以使用 with 语句自动管理资源

使用with语句自动管理资源

推荐使用 with 语句确保响应被正确关闭:

使用with语句管理流式请求
import requests

# 使用with语句自动管理连接
with requests.get('https://httpbin.org/stream/5', stream=True) as response:
    # 检查响应状态
    if response.status_code == 200:
        for line in response.iter_lines():
            if line:  # 过滤空行
                print(f"收到行数据: {line.decode('utf-8')}")
    else:
        print(f"请求失败: {response.status_code}")

# with块结束后,响应会自动关闭,无需手动调用response.close()

大文件下载示例

下载大文件是流式请求最常见的应用场景。下面的示例演示如何下载大文件并保存到本地:

基础大文件下载
import requests

def download_large_file(url, filepath, chunk_size=8192):
    """
    下载大文件到本地

    参数:
        url: 文件URL
        filepath: 本地保存路径
        chunk_size: 每次读取的数据块大小(字节)
    """
    # 发送流式请求
    response = requests.get(url, stream=True)

    # 检查请求是否成功
    response.raise_for_status()

    # 获取文件总大小(如果服务器提供了Content-Length)
    total_size = int(response.headers.get('content-length', 0))

    print(f"开始下载: {url}")
    if total_size > 0:
        print(f"文件大小: {total_size / (1024*1024):.2f} MB")

    # 打开文件进行写入
    with open(filepath, 'wb') as file:
        downloaded_size = 0

        # 迭代读取数据块并写入文件
        for chunk in response.iter_content(chunk_size=chunk_size):
            if chunk:  # 过滤掉keep-alive空块
                file.write(chunk)
                downloaded_size += len(chunk)

                # 显示下载进度(如果知道总大小)
                if total_size > 0:
                    percent = (downloaded_size / total_size) * 100
                    print(f"\r下载进度: {percent:.1f}% ({downloaded_size}/{total_size} 字节)", end='')

        print(f"\n下载完成! 文件已保存到: {filepath}")

    # 自动关闭响应(使用with语句时自动处理)

# 使用示例
if __name__ == "__main__":
    # 示例:下载一个测试文件
    url = "https://www.example.com/largefile.zip"
    save_path = "largefile.zip"

    try:
        download_large_file(url, save_path)
    except requests.exceptions.RequestException as e:
        print(f"下载失败: {e}")
带进度条的大文件下载
import requests
import sys
import time

def download_with_progress(url, filepath, chunk_size=8192):
    """
    下载大文件并显示进度条

    参数:
        url: 文件URL
        filepath: 本地保存路径
        chunk_size: 每次读取的数据块大小(字节)
    """
    # 发送流式请求
    response = requests.get(url, stream=True)
    response.raise_for_status()

    # 获取文件总大小
    total_size = int(response.headers.get('content-length', 0))

    print(f"开始下载: {url}")
    if total_size == 0:
        print("警告: 无法获取文件大小,将不显示进度条")

    # 打开文件进行写入
    with open(filepath, 'wb') as file:
        downloaded_size = 0
        start_time = time.time()

        # 迭代读取数据块
        for chunk in response.iter_content(chunk_size=chunk_size):
            if chunk:
                file.write(chunk)
                downloaded_size += len(chunk)

                # 显示进度条
                if total_size > 0:
                    # 计算进度百分比
                    percent = (downloaded_size / total_size) * 100

                    # 计算下载速度
                    elapsed_time = time.time() - start_time
                    if elapsed_time > 0:
                        speed = downloaded_size / elapsed_time / 1024  # KB/s
                    else:
                        speed = 0

                    # 构建进度条
                    bar_length = 50
                    filled_length = int(bar_length * downloaded_size // total_size)
                    bar = '█' * filled_length + '░' * (bar_length - filled_length)

                    # 打印进度信息
                    sys.stdout.write(f'\r下载进度: |{bar}| {percent:.1f}% '
                                   f'{downloaded_size/(1024*1024):.1f}MB/{total_size/(1024*1024):.1f}MB '
                                   f'速度: {speed:.1f}KB/s')
                    sys.stdout.flush()

        # 下载完成
        elapsed_time = time.time() - start_time
        print(f"\n\n下载完成! 耗时: {elapsed_time:.1f}秒")
        if elapsed_time > 0:
            avg_speed = downloaded_size / elapsed_time / 1024  # KB/s
            print(f"平均速度: {avg_speed:.1f}KB/s")
        print(f"文件已保存到: {filepath}")

# 使用示例
if __name__ == "__main__":
    url = "https://speed.hetzner.de/100MB.bin"  # 测试用的100MB文件
    save_path = "testfile.bin"

    try:
        download_with_progress(url, save_path)
    except requests.exceptions.RequestException as e:
        print(f"下载失败: {e}")
支持断点续传的大文件下载
import requests
import os
import sys

def resume_download(url, filepath, chunk_size=8192):
    """
    支持断点续传的文件下载

    参数:
        url: 文件URL
        filepath: 本地保存路径
        chunk_size: 每次读取的数据块大小(字节)
    """
    # 检查文件是否已部分下载
    if os.path.exists(filepath):
        # 获取已下载的文件大小
        downloaded_size = os.path.getsize(filepath)
        print(f"发现已下载部分: {downloaded_size} 字节")

        # 设置Range头请求剩余部分
        headers = {'Range': f'bytes={downloaded_size}-'}
    else:
        downloaded_size = 0
        headers = {}

    # 发送带Range头的流式请求
    response = requests.get(url, headers=headers, stream=True)

    # 检查服务器是否支持断点续传
    if downloaded_size > 0 and response.status_code != 206:
        print("服务器不支持断点续传,将重新下载")
        downloaded_size = 0
        response.close()
        response = requests.get(url, stream=True)

    response.raise_for_status()

    # 获取文件总大小
    content_range = response.headers.get('content-range')
    if content_range:
        total_size = int(content_range.split('/')[-1])
    else:
        total_size = int(response.headers.get('content-length', 0))

    print(f"开始{'续传' if downloaded_size > 0 else '下载'}: {url}")

    # 以追加模式打开文件
    mode = 'ab' if downloaded_size > 0 else 'wb'
    with open(filepath, mode) as file:
        for chunk in response.iter_content(chunk_size=chunk_size):
            if chunk:
                file.write(chunk)
                downloaded_size += len(chunk)

                # 显示进度
                if total_size > 0:
                    percent = (downloaded_size / total_size) * 100
                    sys.stdout.write(f'\r下载进度: {percent:.1f}% '
                                   f'({downloaded_size/(1024*1024):.1f}MB/{total_size/(1024*1024):.1f}MB)')
                    sys.stdout.flush()

    print(f"\n下载完成! 文件已保存到: {filepath}")

# 使用示例
if __name__ == "__main__":
    url = "https://www.example.com/largefile.zip"
    save_path = "largefile.zip"

    try:
        resume_download(url, save_path)
    except requests.exceptions.RequestException as e:
        print(f"下载失败: {e}")

分块处理数据

Requests提供了多种方法来迭代处理流式数据:

iter_content()

按指定大小的数据块迭代,适用于二进制文件或自定义处理。

for chunk in response.iter_content(chunk_size=1024):
    process_chunk(chunk)
iter_lines()

按行迭代,适用于文本文件或API流式响应。

for line in response.iter_lines():
    process_line(line.decode('utf-8'))
iter_decode()

使用解码器迭代处理,自动处理编码。

for chunk in response.iter_content(decode_unicode=True):
    process_decoded_chunk(chunk)

分块处理示例

实时处理JSON流数据
import requests
import json

def process_json_stream(url):
    """
    处理JSON流式API响应
    示例API返回每行一个JSON对象
    """
    try:
        response = requests.get(url, stream=True)
        response.raise_for_status()

        for line in response.iter_lines():
            if line:  # 过滤空行
                try:
                    # 解析JSON数据
                    data = json.loads(line.decode('utf-8'))

                    # 处理数据
                    print(f"收到数据: {data}")

                    # 这里可以添加业务逻辑
                    # 例如:存储到数据库、实时分析等

                except json.JSONDecodeError as e:
                    print(f"JSON解析错误: {e}")
                    continue

    except requests.exceptions.RequestException as e:
        print(f"请求失败: {e}")
    finally:
        if 'response' in locals():
            response.close()

# 使用示例
if __name__ == "__main__":
    # 使用一个返回JSON流的测试API
    test_url = "https://httpbin.org/stream/10"
    process_json_stream(test_url)

处理实时API流

许多现代API提供流式端点,用于实时数据传输。下面是一些常见场景:

处理长时间运行的流

处理实时API流时,连接可能保持很长时间。需要正确处理超时、重连和错误处理。

服务器推送事件(SSE)示例

处理服务器推送事件(Server-Sent Events)
import requests
import re

def listen_to_sse(url):
    """
    监听服务器推送事件(SSE)
    SSE格式: data: {json}\n\n
    """
    try:
        # 设置长时间超时
        response = requests.get(url, stream=True, timeout=30)
        response.raise_for_status()

        print("开始监听SSE流...")

        buffer = ""
        for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
            if chunk:
                buffer += chunk

                # 查找完整的事件(以\n\n分隔)
                while "\n\n" in buffer:
                    event, buffer = buffer.split("\n\n", 1)

                    # 提取data字段
                    data_match = re.search(r'data:\s*(.+)$', event, re.MULTILINE)
                    if data_match:
                        data_str = data_match.group(1)
                        print(f"收到事件: {data_str}")

                        # 这里可以处理事件数据
                        # 例如:更新UI、触发回调等

    except requests.exceptions.Timeout:
        print("连接超时,尝试重新连接...")
    except requests.exceptions.RequestException as e:
        print(f"连接错误: {e}")
    finally:
        if 'response' in locals():
            response.close()

# 使用示例(需要实际的SSE端点)
# if __name__ == "__main__":
#     sse_url = "https://example.com/api/events"
#     listen_to_sse(sse_url)

处理Twitter/X API流示例

处理社交媒体实时流
import requests
import json
import time

class StreamListener:
    """流式API监听器"""

    def __init__(self, api_url, auth_token):
        self.api_url = api_url
        self.headers = {
            'Authorization': f'Bearer {auth_token}',
            'Content-Type': 'application/json'
        }
        self.running = False

    def on_data(self, data):
        """处理收到的数据"""
        print(f"收到数据: {data.get('text', 'No text')[:50]}...")
        # 这里可以添加自定义处理逻辑
        return True

    def on_error(self, error):
        """处理错误"""
        print(f"流错误: {error}")
        return False

    def start(self):
        """开始监听流"""
        self.running = True

        while self.running:
            try:
                # 设置流式请求
                response = requests.post(
                    self.api_url,
                    headers=self.headers,
                    json={'track': 'python'},  # 跟踪关键词
                    stream=True,
                    timeout=90
                )

                print("连接成功,开始接收数据流...")

                # 处理数据流
                for line in response.iter_lines():
                    if not self.running:
                        break

                    if line:
                        try:
                            data = json.loads(line.decode('utf-8'))

                            # 调用数据处理方法
                            if not self.on_data(data):
                                break

                        except json.JSONDecodeError:
                            print(f"无法解析JSON: {line[:100]}")
                            continue

            except requests.exceptions.Timeout:
                print("连接超时,5秒后重试...")
                time.sleep(5)
                continue
            except requests.exceptions.RequestException as e:
                print(f"请求错误: {e}")
                if not self.on_error(str(e)):
                    break
                time.sleep(10)  # 等待后重试

    def stop(self):
        """停止监听"""
        self.running = False
        print("停止监听流...")

# 使用示例(需要实际的API凭证)
# if __name__ == "__main__":
#     listener = StreamListener(
#         api_url="https://api.twitter.com/2/tweets/search/stream",
#         auth_token="YOUR_BEARER_TOKEN"
#     )
#
#     try:
#         listener.start()
#     except KeyboardInterrupt:
#         listener.stop()

性能优化技巧

1选择合适的chunk_size

根据网络速度和内存限制调整chunk_size:

  • 高速网络:较大的chunk_size(如64KB)
  • 低内存环境:较小的chunk_size(如4KB)
  • 默认值:通常8KB是一个好的起点
2使用连接池

对于多个流式请求,使用Session对象复用连接:

session = requests.Session()
# 所有使用session的请求都会复用连接
3设置合理的超时

为长时间运行的流设置适当的超时:

# 连接超时和读取超时
response = requests.get(url, stream=True,
                       timeout=(3.05, 60))
4及时关闭连接

使用with语句或手动关闭连接:

# 方法1: with语句
with requests.get(url, stream=True) as r:
    process_stream(r)

# 方法2: 手动关闭
response.close()
注意事项
  • 避免在循环中创建大量流式请求,可能导致连接耗尽
  • 处理大数据流时,考虑使用异步框架(如aiohttp)
  • 监控内存使用,确保不会因为缓冲数据而耗尽内存
  • 对于生产环境,实现完整的错误处理和重试机制

常见应用场景

大文件下载

下载视频、ISO镜像、数据库备份等大文件,避免内存溢出。

  • 视频流媒体服务
  • 软件分发平台
  • 数据备份系统
实时数据监控

监控系统指标、日志流、传感器数据等实时信息。

  • 服务器监控
  • IoT设备数据流
  • 实时日志分析
社交媒体流

实时接收Twitter、Reddit等社交媒体的数据流。

  • 舆情监控
  • 实时趋势分析
  • 社交媒体机器人
数据管道

构建ETL管道,从API流式接收数据并实时处理。

  • 实时数据仓库
  • 流式ETL处理
  • API数据同步

总结

Requests的流式请求功能是处理大文件、实时数据流和内存敏感场景的强大工具。通过设置 stream=True 参数,你可以以增量方式处理HTTP响应,避免一次性加载大量数据到内存中。

关键要点:

  • 使用 stream=True 启用流式模式
  • 使用 iter_content()iter_lines() 迭代处理数据
  • 使用 with 语句或手动调用 response.close() 释放连接
  • 根据应用场景选择合适的chunk_size
  • 为长时间运行的流实现错误处理和重试机制